mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-04 22:10:39 +00:00
walredo: apply_batch_postgres: get a backtrace whenever it encounters an error (#5541)
For 2 weeks we've seen rare, spurious, not-reproducible page
reconstruction
failures with PG16 in prod.
One of the commits we deployed this week was
Commit
commit fc467941f9
Author: Joonas Koivunen <joonas@neon.tech>
Date: Wed Oct 4 16:19:19 2023 +0300
walredo: log retryed error (#546)
With the logs from that commit, we learned that some read() or write()
system call that walredo does fails with `EAGAIN`, aka
`Resource temporarily unavailable (os error 11)`.
But we have no idea where exactly in the code we get back that error.
So, use anyhow instead of fake std::io::Error's as an easy way to get
a backtrace when the error happens, and change the logging to print
that backtrace (i.e., use `{:?}` instead of
`utils::error::report_compact_sources(e)`).
The `WalRedoError` type had to go because we add additional `.context()`
further up the call chain before we `{:?}`-print it. That additional
`.context()` further up doesn't see that there's already an
anyhow::Error
inside the `WalRedoError::ApplyWalRecords` variant, and hence captures
another backtrace and prints that one on `{:?}`-print instead of the
original one inside `WalRedoError::ApplyWalRecords`.
If we ever switch back to `report_compact_sources`, we should make sure
we have some other way to uniquely identify the places where we return
an error in the error message.
This commit is contained in:
committed by
GitHub
parent
21deb81acb
commit
dd6990567f
@@ -11,10 +11,7 @@ use std::sync::{Arc, Barrier};
|
||||
|
||||
use bytes::{Buf, Bytes};
|
||||
use pageserver::{
|
||||
config::PageServerConf,
|
||||
repository::Key,
|
||||
walrecord::NeonWalRecord,
|
||||
walredo::{PostgresRedoManager, WalRedoError},
|
||||
config::PageServerConf, repository::Key, walrecord::NeonWalRecord, walredo::PostgresRedoManager,
|
||||
};
|
||||
use utils::{id::TenantId, lsn::Lsn};
|
||||
|
||||
@@ -152,7 +149,7 @@ impl Drop for JoinOnDrop {
|
||||
}
|
||||
}
|
||||
|
||||
fn execute_all<I>(input: I, manager: &PostgresRedoManager) -> Result<(), WalRedoError>
|
||||
fn execute_all<I>(input: I, manager: &PostgresRedoManager) -> anyhow::Result<()>
|
||||
where
|
||||
I: IntoIterator<Item = Request>,
|
||||
{
|
||||
@@ -160,7 +157,7 @@ where
|
||||
input.into_iter().try_for_each(|req| {
|
||||
let page = req.execute(manager)?;
|
||||
assert_eq!(page.remaining(), 8192);
|
||||
Ok::<_, WalRedoError>(())
|
||||
anyhow::Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
@@ -473,7 +470,7 @@ struct Request {
|
||||
}
|
||||
|
||||
impl Request {
|
||||
fn execute(self, manager: &PostgresRedoManager) -> Result<Bytes, WalRedoError> {
|
||||
fn execute(self, manager: &PostgresRedoManager) -> anyhow::Result<Bytes> {
|
||||
use pageserver::walredo::WalRedoManager;
|
||||
|
||||
let Request {
|
||||
|
||||
@@ -136,9 +136,7 @@ impl From<PageReconstructError> for ApiError {
|
||||
PageReconstructError::AncestorStopping(_) => {
|
||||
ApiError::ResourceUnavailable(format!("{pre}").into())
|
||||
}
|
||||
PageReconstructError::WalRedo(pre) => {
|
||||
ApiError::InternalServerError(anyhow::Error::new(pre))
|
||||
}
|
||||
PageReconstructError::WalRedo(pre) => ApiError::InternalServerError(pre),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3448,11 +3448,8 @@ pub mod harness {
|
||||
|
||||
use crate::deletion_queue::mock::MockDeletionQueue;
|
||||
use crate::{
|
||||
config::PageServerConf,
|
||||
repository::Key,
|
||||
tenant::Tenant,
|
||||
walrecord::NeonWalRecord,
|
||||
walredo::{WalRedoError, WalRedoManager},
|
||||
config::PageServerConf, repository::Key, tenant::Tenant, walrecord::NeonWalRecord,
|
||||
walredo::WalRedoManager,
|
||||
};
|
||||
|
||||
use super::*;
|
||||
@@ -3625,7 +3622,7 @@ pub mod harness {
|
||||
base_img: Option<(Lsn, Bytes)>,
|
||||
records: Vec<(Lsn, NeonWalRecord)>,
|
||||
_pg_version: u32,
|
||||
) -> Result<Bytes, WalRedoError> {
|
||||
) -> anyhow::Result<Bytes> {
|
||||
let s = format!(
|
||||
"redo for {} to get to {}, with {} and {} records",
|
||||
key,
|
||||
|
||||
@@ -370,7 +370,7 @@ pub enum PageReconstructError {
|
||||
|
||||
/// An error happened replaying WAL records
|
||||
#[error(transparent)]
|
||||
WalRedo(#[from] crate::walredo::WalRedoError),
|
||||
WalRedo(anyhow::Error),
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for PageReconstructError {
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
//! any WAL records, so that even if an attacker hijacks the Postgres
|
||||
//! process, he cannot escape out of it.
|
||||
//!
|
||||
use anyhow::Context;
|
||||
use byteorder::{ByteOrder, LittleEndian};
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use nix::poll::*;
|
||||
@@ -89,7 +90,7 @@ pub trait WalRedoManager: Send + Sync {
|
||||
base_img: Option<(Lsn, Bytes)>,
|
||||
records: Vec<(Lsn, NeonWalRecord)>,
|
||||
pg_version: u32,
|
||||
) -> Result<Bytes, WalRedoError>;
|
||||
) -> anyhow::Result<Bytes>;
|
||||
}
|
||||
|
||||
struct ProcessInput {
|
||||
@@ -140,20 +141,6 @@ fn can_apply_in_neon(rec: &NeonWalRecord) -> bool {
|
||||
}
|
||||
}
|
||||
|
||||
/// An error happened in WAL redo
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum WalRedoError {
|
||||
#[error(transparent)]
|
||||
IoError(#[from] std::io::Error),
|
||||
|
||||
#[error("cannot perform WAL redo now")]
|
||||
InvalidState,
|
||||
#[error("cannot perform WAL redo for this request")]
|
||||
InvalidRequest,
|
||||
#[error("cannot perform WAL redo for this record")]
|
||||
InvalidRecord,
|
||||
}
|
||||
|
||||
///
|
||||
/// Public interface of WAL redo manager
|
||||
///
|
||||
@@ -171,10 +158,9 @@ impl WalRedoManager for PostgresRedoManager {
|
||||
base_img: Option<(Lsn, Bytes)>,
|
||||
records: Vec<(Lsn, NeonWalRecord)>,
|
||||
pg_version: u32,
|
||||
) -> Result<Bytes, WalRedoError> {
|
||||
) -> anyhow::Result<Bytes> {
|
||||
if records.is_empty() {
|
||||
error!("invalid WAL redo request with no records");
|
||||
return Err(WalRedoError::InvalidRequest);
|
||||
anyhow::bail!("invalid WAL redo request with no records");
|
||||
}
|
||||
|
||||
let base_img_lsn = base_img.as_ref().map(|p| p.0).unwrap_or(Lsn::INVALID);
|
||||
@@ -260,8 +246,8 @@ impl PostgresRedoManager {
|
||||
records: &[(Lsn, NeonWalRecord)],
|
||||
wal_redo_timeout: Duration,
|
||||
pg_version: u32,
|
||||
) -> Result<Bytes, WalRedoError> {
|
||||
let (rel, blknum) = key_to_rel_block(key).or(Err(WalRedoError::InvalidRecord))?;
|
||||
) -> anyhow::Result<Bytes> {
|
||||
let (rel, blknum) = key_to_rel_block(key).context("invalid record")?;
|
||||
const MAX_RETRY_ATTEMPTS: u32 = 1;
|
||||
let start_time = Instant::now();
|
||||
let mut n_attempts = 0u32;
|
||||
@@ -271,7 +257,8 @@ impl PostgresRedoManager {
|
||||
|
||||
// launch the WAL redo process on first use
|
||||
if proc.is_none() {
|
||||
self.launch(&mut proc, pg_version)?;
|
||||
self.launch(&mut proc, pg_version)
|
||||
.context("launch process")?;
|
||||
}
|
||||
WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64());
|
||||
|
||||
@@ -279,7 +266,7 @@ impl PostgresRedoManager {
|
||||
let buf_tag = BufferTag { rel, blknum };
|
||||
let result = self
|
||||
.apply_wal_records(proc, buf_tag, &base_img, records, wal_redo_timeout)
|
||||
.map_err(WalRedoError::IoError);
|
||||
.context("apply_wal_records");
|
||||
|
||||
let end_time = Instant::now();
|
||||
let duration = end_time.duration_since(lock_time);
|
||||
@@ -309,15 +296,15 @@ impl PostgresRedoManager {
|
||||
// next request will launch a new one.
|
||||
if let Err(e) = result.as_ref() {
|
||||
error!(
|
||||
n_attempts,
|
||||
"error applying {} WAL records {}..{} ({} bytes) to base image with LSN {} to reconstruct page image at LSN {}: {}",
|
||||
"error applying {} WAL records {}..{} ({} bytes) to base image with LSN {} to reconstruct page image at LSN {} n_attempts={}: {:?}",
|
||||
records.len(),
|
||||
records.first().map(|p| p.0).unwrap_or(Lsn(0)),
|
||||
records.last().map(|p| p.0).unwrap_or(Lsn(0)),
|
||||
nbytes,
|
||||
base_img_lsn,
|
||||
lsn,
|
||||
utils::error::report_compact_sources(e),
|
||||
n_attempts,
|
||||
e,
|
||||
);
|
||||
// self.stdin only holds stdin & stderr as_raw_fd().
|
||||
// Dropping it as part of take() doesn't close them.
|
||||
@@ -354,7 +341,7 @@ impl PostgresRedoManager {
|
||||
lsn: Lsn,
|
||||
base_img: Option<Bytes>,
|
||||
records: &[(Lsn, NeonWalRecord)],
|
||||
) -> Result<Bytes, WalRedoError> {
|
||||
) -> anyhow::Result<Bytes> {
|
||||
let start_time = Instant::now();
|
||||
|
||||
let mut page = BytesMut::new();
|
||||
@@ -363,8 +350,7 @@ impl PostgresRedoManager {
|
||||
page.extend_from_slice(&fpi[..]);
|
||||
} else {
|
||||
// All the current WAL record types that we can handle require a base image.
|
||||
error!("invalid neon WAL redo request with no base image");
|
||||
return Err(WalRedoError::InvalidRequest);
|
||||
anyhow::bail!("invalid neon WAL redo request with no base image");
|
||||
}
|
||||
|
||||
// Apply all the WAL records in the batch
|
||||
@@ -392,14 +378,13 @@ impl PostgresRedoManager {
|
||||
page: &mut BytesMut,
|
||||
_record_lsn: Lsn,
|
||||
record: &NeonWalRecord,
|
||||
) -> Result<(), WalRedoError> {
|
||||
) -> anyhow::Result<()> {
|
||||
match record {
|
||||
NeonWalRecord::Postgres {
|
||||
will_init: _,
|
||||
rec: _,
|
||||
} => {
|
||||
error!("tried to pass postgres wal record to neon WAL redo");
|
||||
return Err(WalRedoError::InvalidRequest);
|
||||
anyhow::bail!("tried to pass postgres wal record to neon WAL redo");
|
||||
}
|
||||
NeonWalRecord::ClearVisibilityMapFlags {
|
||||
new_heap_blkno,
|
||||
@@ -407,7 +392,7 @@ impl PostgresRedoManager {
|
||||
flags,
|
||||
} => {
|
||||
// sanity check that this is modifying the correct relation
|
||||
let (rel, blknum) = key_to_rel_block(key).or(Err(WalRedoError::InvalidRecord))?;
|
||||
let (rel, blknum) = key_to_rel_block(key).context("invalid record")?;
|
||||
assert!(
|
||||
rel.forknum == VISIBILITYMAP_FORKNUM,
|
||||
"ClearVisibilityMapFlags record on unexpected rel {}",
|
||||
@@ -445,7 +430,7 @@ impl PostgresRedoManager {
|
||||
// same effects as the corresponding Postgres WAL redo function.
|
||||
NeonWalRecord::ClogSetCommitted { xids, timestamp } => {
|
||||
let (slru_kind, segno, blknum) =
|
||||
key_to_slru_block(key).or(Err(WalRedoError::InvalidRecord))?;
|
||||
key_to_slru_block(key).context("invalid record")?;
|
||||
assert_eq!(
|
||||
slru_kind,
|
||||
SlruKind::Clog,
|
||||
@@ -495,7 +480,7 @@ impl PostgresRedoManager {
|
||||
}
|
||||
NeonWalRecord::ClogSetAborted { xids } => {
|
||||
let (slru_kind, segno, blknum) =
|
||||
key_to_slru_block(key).or(Err(WalRedoError::InvalidRecord))?;
|
||||
key_to_slru_block(key).context("invalid record")?;
|
||||
assert_eq!(
|
||||
slru_kind,
|
||||
SlruKind::Clog,
|
||||
@@ -526,7 +511,7 @@ impl PostgresRedoManager {
|
||||
}
|
||||
NeonWalRecord::MultixactOffsetCreate { mid, moff } => {
|
||||
let (slru_kind, segno, blknum) =
|
||||
key_to_slru_block(key).or(Err(WalRedoError::InvalidRecord))?;
|
||||
key_to_slru_block(key).context("invalid record")?;
|
||||
assert_eq!(
|
||||
slru_kind,
|
||||
SlruKind::MultiXactOffsets,
|
||||
@@ -559,7 +544,7 @@ impl PostgresRedoManager {
|
||||
}
|
||||
NeonWalRecord::MultixactMembersCreate { moff, members } => {
|
||||
let (slru_kind, segno, blknum) =
|
||||
key_to_slru_block(key).or(Err(WalRedoError::InvalidRecord))?;
|
||||
key_to_slru_block(key).context("invalid record")?;
|
||||
assert_eq!(
|
||||
slru_kind,
|
||||
SlruKind::MultiXactMembers,
|
||||
@@ -759,7 +744,7 @@ impl PostgresRedoManager {
|
||||
base_img: &Option<Bytes>,
|
||||
records: &[(Lsn, NeonWalRecord)],
|
||||
wal_redo_timeout: Duration,
|
||||
) -> Result<Bytes, std::io::Error> {
|
||||
) -> anyhow::Result<Bytes> {
|
||||
// Serialize all the messages to send the WAL redo process first.
|
||||
//
|
||||
// This could be problematic if there are millions of records to replay,
|
||||
@@ -782,10 +767,7 @@ impl PostgresRedoManager {
|
||||
{
|
||||
build_apply_record_msg(*lsn, postgres_rec, &mut writebuf);
|
||||
} else {
|
||||
return Err(Error::new(
|
||||
ErrorKind::Other,
|
||||
"tried to pass neon wal record to postgres WAL redo",
|
||||
));
|
||||
anyhow::bail!("tried to pass neon wal record to postgres WAL redo");
|
||||
}
|
||||
}
|
||||
build_get_page_msg(tag, &mut writebuf);
|
||||
@@ -807,7 +789,7 @@ impl PostgresRedoManager {
|
||||
writebuf: &[u8],
|
||||
mut input: MutexGuard<Option<ProcessInput>>,
|
||||
wal_redo_timeout: Duration,
|
||||
) -> Result<Bytes, std::io::Error> {
|
||||
) -> anyhow::Result<Bytes> {
|
||||
let proc = input.as_mut().unwrap();
|
||||
let mut nwrite = 0usize;
|
||||
let stdout_fd = proc.stdout_fd;
|
||||
@@ -831,7 +813,7 @@ impl PostgresRedoManager {
|
||||
}?;
|
||||
|
||||
if n == 0 {
|
||||
return Err(Error::new(ErrorKind::Other, "WAL redo timed out"));
|
||||
anyhow::bail!("WAL redo timed out");
|
||||
}
|
||||
|
||||
// If we have some messages in stderr, forward them to the log.
|
||||
@@ -855,10 +837,7 @@ impl PostgresRedoManager {
|
||||
continue;
|
||||
}
|
||||
} else if err_revents.contains(PollFlags::POLLHUP) {
|
||||
return Err(Error::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"WAL redo process closed its stderr unexpectedly",
|
||||
));
|
||||
anyhow::bail!("WAL redo process closed its stderr unexpectedly");
|
||||
}
|
||||
|
||||
// If 'stdin' is writeable, do write.
|
||||
@@ -867,10 +846,7 @@ impl PostgresRedoManager {
|
||||
nwrite += proc.stdin.write(&writebuf[nwrite..])?;
|
||||
} else if in_revents.contains(PollFlags::POLLHUP) {
|
||||
// We still have more data to write, but the process closed the pipe.
|
||||
return Err(Error::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"WAL redo process closed its stdin unexpectedly",
|
||||
));
|
||||
anyhow::bail!("WAL redo process closed its stdin unexpectedly");
|
||||
}
|
||||
}
|
||||
let request_no = proc.n_requests;
|
||||
@@ -901,10 +877,7 @@ impl PostgresRedoManager {
|
||||
//
|
||||
// Cross-read this with the comment in apply_batch_postgres if result.is_err().
|
||||
// That's where we kill the child process.
|
||||
return Err(Error::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"WAL redo process closed its stdout unexpectedly",
|
||||
));
|
||||
anyhow::bail!("WAL redo process closed its stdout unexpectedly");
|
||||
}
|
||||
let n_processed_responses = output.n_processed_responses;
|
||||
while n_processed_responses + output.pending_responses.len() <= request_no {
|
||||
@@ -923,7 +896,7 @@ impl PostgresRedoManager {
|
||||
}?;
|
||||
|
||||
if n == 0 {
|
||||
return Err(Error::new(ErrorKind::Other, "WAL redo timed out"));
|
||||
anyhow::bail!("WAL redo timed out");
|
||||
}
|
||||
|
||||
// If we have some messages in stderr, forward them to the log.
|
||||
@@ -947,10 +920,7 @@ impl PostgresRedoManager {
|
||||
continue;
|
||||
}
|
||||
} else if err_revents.contains(PollFlags::POLLHUP) {
|
||||
return Err(Error::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"WAL redo process closed its stderr unexpectedly",
|
||||
));
|
||||
anyhow::bail!("WAL redo process closed its stderr unexpectedly");
|
||||
}
|
||||
|
||||
// If we have some data in stdout, read it to the result buffer.
|
||||
@@ -958,10 +928,7 @@ impl PostgresRedoManager {
|
||||
if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
|
||||
nresult += output.stdout.read(&mut resultbuf[nresult..])?;
|
||||
} else if out_revents.contains(PollFlags::POLLHUP) {
|
||||
return Err(Error::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"WAL redo process closed its stdout unexpectedly",
|
||||
));
|
||||
anyhow::bail!("WAL redo process closed its stdout unexpectedly");
|
||||
}
|
||||
}
|
||||
output
|
||||
|
||||
Reference in New Issue
Block a user