mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-27 16:12:56 +00:00
## TLDR This PR is a no-op. The changes are disabled by default. ## Problem I. Currently we don't have a way to detect disk I/O failures from WAL operations. II. We observe that the offloader fails to upload a segment due to race conditions on XLOG SWITCH and PG start streaming WALs. wal_backup task continously failing to upload a full segment while the segment remains partial on the disk. The consequence is that commit_lsn for all SKs move forward but backup_lsn stays the same. Then, all SKs run out of disk space. III. We have discovered SK bugs where the WAL offload owner cannot keep up with WAL backup/upload to S3, which results in an unbounded accumulation of WAL segment files on the Safekeeper's disk until the disk becomes full. This is a somewhat dangerous operation that is hard to recover from because the Safekeeper cannot write its control files when it is out of disk space. There are actually 2 problems here: 1. A single problematic timeline can take over the entire disk for the SK 2. Once out of disk, it's difficult to recover SK IV. Neon reports certain storage errors as "critical" errors using a marco, which will increment a counter/metric that can be used to raise alerts. However, this metric isn't sliced by tenant and/or timeline today. We need the tenant/timeline dimension to better respond to incidents and for blast radius analysis. ## Summary of changes I. The PR adds a `safekeeper_wal_disk_io_errors ` which is incremented when SK fails to create or flush WALs. II. To mitigate this issue, we will re-elect a new offloader if the current offloader is lagging behind too much. Each SK makes the decision locally but they are aware of each other's commit and backup lsns. The new algorithm is - determine_offloader will pick a SK. say SK-1. - Each SK checks -- if commit_lsn - back_lsn > threshold, -- -- remove SK-1 from the candidate and call determine_offloader again. SK-1 will step down and all SKs will elect the same leader again. After the backup is caught up, the leader will become SK-1 again. This also helps when SK-1 is slow to backup. I'll set the reelect backup lag to 4 GB later. Setting to 128 MB in dev to trigger the code more frequently. III. This change addresses problem no. 1 by having the Safekeeper perform a timeline disk utilization check check when processing WAL proposal messages from Postgres/compute. The Safekeeper now rejects the WAL proposal message, effectively stops writing more WAL for the timeline to disk, if the existing WAL files for the timeline on the SK disk exceeds a certain size (the default threshold is 100GB). The disk utilization is calculated based on a `last_removed_segno` variable tracked by the background task removing WAL files, which produces an accurate and conservative estimate (>= than actual disk usage) of the actual disk usage. IV. * Add a new metric `hadron_critical_storage_event_count` that has the `tenant_shard_id` and `timeline_id` as dimensions. * Modified the `crtitical!` marco to include tenant_id and timeline_id as additional arguments and adapted existing call sites to populate the tenant shard and timeline ID fields. The `critical!` marco invocation now increments the `hadron_critical_storage_event_count` with the extra dimensions. (In SK there isn't the notion of a tenant-shard, so just the tenant ID is recorded in lieu of tenant shard ID.) I considered adding a separate marco to avoid merge conflicts, but I think in this case (detecting critical errors) conflicts are probably more desirable so that we can be aware whenever Neon adds another `critical!` invocation in their code. --------- Co-authored-by: Chen Luo <chen.luo@databricks.com> Co-authored-by: Haoyu Huang <haoyu.huang@databricks.com> Co-authored-by: William Huang <william.huang@databricks.com>
300 lines
8.7 KiB
Rust
300 lines
8.7 KiB
Rust
use std::pin::Pin;
|
|
use std::task::{Context, Poll};
|
|
|
|
use crate::send_wal::EndWatch;
|
|
use crate::timeline::WalResidentTimeline;
|
|
use crate::wal_storage::WalReader;
|
|
use bytes::Bytes;
|
|
use futures::stream::BoxStream;
|
|
use futures::{Stream, StreamExt};
|
|
use safekeeper_api::Term;
|
|
use utils::id::TenantTimelineId;
|
|
use utils::lsn::Lsn;
|
|
|
|
#[derive(PartialEq, Eq, Debug)]
|
|
pub(crate) struct WalBytes {
|
|
/// Raw PG WAL
|
|
pub(crate) wal: Bytes,
|
|
/// Start LSN of [`Self::wal`]
|
|
#[allow(dead_code)]
|
|
pub(crate) wal_start_lsn: Lsn,
|
|
/// End LSN of [`Self::wal`]
|
|
pub(crate) wal_end_lsn: Lsn,
|
|
/// End LSN of WAL available on the safekeeper.
|
|
///
|
|
/// For pagservers this will be commit LSN,
|
|
/// while for the compute it will be the flush LSN.
|
|
pub(crate) available_wal_end_lsn: Lsn,
|
|
}
|
|
|
|
struct PositionedWalReader {
|
|
start: Lsn,
|
|
end: Lsn,
|
|
reader: Option<WalReader>,
|
|
}
|
|
|
|
/// A streaming WAL reader wrapper which can be reset while running
|
|
pub(crate) struct StreamingWalReader {
|
|
stream: BoxStream<'static, WalOrReset>,
|
|
start_changed_tx: tokio::sync::watch::Sender<Lsn>,
|
|
// HADRON: Added TenantTimelineId for instrumentation purposes.
|
|
pub(crate) ttid: TenantTimelineId,
|
|
}
|
|
|
|
pub(crate) enum WalOrReset {
|
|
Wal(anyhow::Result<WalBytes>),
|
|
Reset(Lsn),
|
|
}
|
|
|
|
impl WalOrReset {
|
|
pub(crate) fn get_wal(self) -> Option<anyhow::Result<WalBytes>> {
|
|
match self {
|
|
WalOrReset::Wal(wal) => Some(wal),
|
|
WalOrReset::Reset(_) => None,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl StreamingWalReader {
|
|
pub(crate) fn new(
|
|
tli: WalResidentTimeline,
|
|
term: Option<Term>,
|
|
start: Lsn,
|
|
end: Lsn,
|
|
end_watch: EndWatch,
|
|
buffer_size: usize,
|
|
) -> Self {
|
|
let (start_changed_tx, start_changed_rx) = tokio::sync::watch::channel(start);
|
|
let ttid = tli.ttid;
|
|
|
|
let state = WalReaderStreamState {
|
|
tli,
|
|
wal_reader: PositionedWalReader {
|
|
start,
|
|
end,
|
|
reader: None,
|
|
},
|
|
term,
|
|
end_watch,
|
|
buffer: vec![0; buffer_size],
|
|
buffer_size,
|
|
};
|
|
|
|
// When a change notification is received while polling the internal
|
|
// reader, stop polling the read future and service the change.
|
|
let stream = futures::stream::unfold(
|
|
(state, start_changed_rx),
|
|
|(mut state, mut rx)| async move {
|
|
let wal_or_reset = tokio::select! {
|
|
read_res = state.read() => { WalOrReset::Wal(read_res) },
|
|
changed_res = rx.changed() => {
|
|
if changed_res.is_err() {
|
|
return None;
|
|
}
|
|
|
|
let new_start_pos = rx.borrow_and_update();
|
|
WalOrReset::Reset(*new_start_pos)
|
|
}
|
|
};
|
|
|
|
if let WalOrReset::Reset(lsn) = wal_or_reset {
|
|
state.wal_reader.start = lsn;
|
|
state.wal_reader.reader = None;
|
|
}
|
|
|
|
Some((wal_or_reset, (state, rx)))
|
|
},
|
|
)
|
|
.boxed();
|
|
|
|
Self {
|
|
stream,
|
|
start_changed_tx,
|
|
ttid,
|
|
}
|
|
}
|
|
|
|
/// Reset the stream to a given position.
|
|
pub(crate) async fn reset(&mut self, start: Lsn) {
|
|
self.start_changed_tx.send(start).unwrap();
|
|
while let Some(wal_or_reset) = self.stream.next().await {
|
|
match wal_or_reset {
|
|
WalOrReset::Reset(at) => {
|
|
// Stream confirmed the reset.
|
|
// There may only one ongoing reset at any given time,
|
|
// hence the assertion.
|
|
assert_eq!(at, start);
|
|
break;
|
|
}
|
|
WalOrReset::Wal(_) => {
|
|
// Ignore wal generated before reset was handled
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Stream for StreamingWalReader {
|
|
type Item = WalOrReset;
|
|
|
|
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
|
Pin::new(&mut self.stream).poll_next(cx)
|
|
}
|
|
}
|
|
|
|
struct WalReaderStreamState {
|
|
tli: WalResidentTimeline,
|
|
wal_reader: PositionedWalReader,
|
|
term: Option<Term>,
|
|
end_watch: EndWatch,
|
|
buffer: Vec<u8>,
|
|
buffer_size: usize,
|
|
}
|
|
|
|
impl WalReaderStreamState {
|
|
async fn read(&mut self) -> anyhow::Result<WalBytes> {
|
|
// Create reader if needed
|
|
if self.wal_reader.reader.is_none() {
|
|
self.wal_reader.reader = Some(self.tli.get_walreader(self.wal_reader.start).await?);
|
|
}
|
|
|
|
let have_something_to_send = self.wal_reader.end > self.wal_reader.start;
|
|
if !have_something_to_send {
|
|
tracing::debug!(
|
|
"Waiting for wal: start={}, end={}",
|
|
self.wal_reader.end,
|
|
self.wal_reader.start
|
|
);
|
|
self.wal_reader.end = self
|
|
.end_watch
|
|
.wait_for_lsn(self.wal_reader.start, self.term)
|
|
.await?;
|
|
tracing::debug!(
|
|
"Done waiting for wal: start={}, end={}",
|
|
self.wal_reader.end,
|
|
self.wal_reader.start
|
|
);
|
|
}
|
|
|
|
assert!(
|
|
self.wal_reader.end > self.wal_reader.start,
|
|
"nothing to send after waiting for WAL"
|
|
);
|
|
|
|
// Calculate chunk size
|
|
let mut chunk_end_pos = self.wal_reader.start + self.buffer_size as u64;
|
|
if chunk_end_pos >= self.wal_reader.end {
|
|
chunk_end_pos = self.wal_reader.end;
|
|
} else {
|
|
chunk_end_pos = chunk_end_pos
|
|
.checked_sub(chunk_end_pos.block_offset())
|
|
.unwrap();
|
|
}
|
|
|
|
let send_size = (chunk_end_pos.0 - self.wal_reader.start.0) as usize;
|
|
let buffer = &mut self.buffer[..send_size];
|
|
|
|
// Read WAL
|
|
let send_size = {
|
|
let _term_guard = if let Some(t) = self.term {
|
|
Some(self.tli.acquire_term(t).await?)
|
|
} else {
|
|
None
|
|
};
|
|
self.wal_reader
|
|
.reader
|
|
.as_mut()
|
|
.unwrap()
|
|
.read(buffer)
|
|
.await?
|
|
};
|
|
|
|
let wal = Bytes::copy_from_slice(&buffer[..send_size]);
|
|
let result = WalBytes {
|
|
wal,
|
|
wal_start_lsn: self.wal_reader.start,
|
|
wal_end_lsn: self.wal_reader.start + send_size as u64,
|
|
available_wal_end_lsn: self.wal_reader.end,
|
|
};
|
|
|
|
self.wal_reader.start += send_size as u64;
|
|
|
|
Ok(result)
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use std::str::FromStr;
|
|
|
|
use futures::StreamExt;
|
|
use postgres_ffi::MAX_SEND_SIZE;
|
|
use utils::id::{NodeId, TenantTimelineId};
|
|
use utils::lsn::Lsn;
|
|
|
|
use crate::test_utils::Env;
|
|
use crate::wal_reader_stream::StreamingWalReader;
|
|
|
|
#[tokio::test]
|
|
async fn test_streaming_wal_reader_reset() {
|
|
let _ = env_logger::builder().is_test(true).try_init();
|
|
|
|
const SIZE: usize = 8 * 1024;
|
|
const MSG_COUNT: usize = 200;
|
|
|
|
let start_lsn = Lsn::from_str("0/149FD18").unwrap();
|
|
let env = Env::new(true).unwrap();
|
|
let tli = env
|
|
.make_timeline(NodeId(1), TenantTimelineId::generate(), start_lsn)
|
|
.await
|
|
.unwrap();
|
|
|
|
let resident_tli = tli.wal_residence_guard().await.unwrap();
|
|
let end_watch = Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT, c"neon-file:", None)
|
|
.await
|
|
.unwrap();
|
|
let end_pos = end_watch.get();
|
|
|
|
tracing::info!("Doing first round of reads ...");
|
|
|
|
let mut streaming_wal_reader = StreamingWalReader::new(
|
|
resident_tli,
|
|
None,
|
|
start_lsn,
|
|
end_pos,
|
|
end_watch,
|
|
MAX_SEND_SIZE,
|
|
);
|
|
|
|
let mut before_reset = Vec::new();
|
|
while let Some(wor) = streaming_wal_reader.next().await {
|
|
let wal = wor.get_wal().unwrap().unwrap();
|
|
let stop = wal.available_wal_end_lsn == wal.wal_end_lsn;
|
|
before_reset.push(wal);
|
|
|
|
if stop {
|
|
break;
|
|
}
|
|
}
|
|
|
|
tracing::info!("Resetting the WAL stream ...");
|
|
|
|
streaming_wal_reader.reset(start_lsn).await;
|
|
|
|
tracing::info!("Doing second round of reads ...");
|
|
|
|
let mut after_reset = Vec::new();
|
|
while let Some(wor) = streaming_wal_reader.next().await {
|
|
let wal = wor.get_wal().unwrap().unwrap();
|
|
let stop = wal.available_wal_end_lsn == wal.wal_end_lsn;
|
|
after_reset.push(wal);
|
|
|
|
if stop {
|
|
break;
|
|
}
|
|
}
|
|
|
|
assert_eq!(before_reset, after_reset);
|
|
}
|
|
}
|