Files
neon/safekeeper/src/control_file.rs
HaoyuHuang 4db934407a SK changes #1 (#12448)
## TLDR
This PR is a no-op. The changes are disabled by default. 

## Problem
I. Currently we don't have a way to detect disk I/O failures from WAL
operations.

II.
We observe that the offloader fails to upload a segment due to race
conditions on XLOG SWITCH and PG start streaming WALs. wal_backup task
continously failing to upload a full segment while the segment remains
partial on the disk.

The consequence is that commit_lsn for all SKs move forward but
backup_lsn stays the same. Then, all SKs run out of disk space.

III.
We have discovered SK bugs where the WAL offload owner cannot keep up
with WAL backup/upload to S3, which results in an unbounded accumulation
of WAL segment files on the Safekeeper's disk until the disk becomes
full. This is a somewhat dangerous operation that is hard to recover
from because the Safekeeper cannot write its control files when it is
out of disk space. There are actually 2 problems here:

1. A single problematic timeline can take over the entire disk for the
SK
2. Once out of disk, it's difficult to recover SK


IV. 
Neon reports certain storage errors as "critical" errors using a marco,
which will increment a counter/metric that can be used to raise alerts.
However, this metric isn't sliced by tenant and/or timeline today. We
need the tenant/timeline dimension to better respond to incidents and
for blast radius analysis.

## Summary of changes
I. 
The PR adds a `safekeeper_wal_disk_io_errors ` which is incremented when
SK fails to create or flush WALs.

II. 
To mitigate this issue, we will re-elect a new offloader if the current
offloader is lagging behind too much.
Each SK makes the decision locally but they are aware of each other's
commit and backup lsns.

The new algorithm is
- determine_offloader will pick a SK. say SK-1.
- Each SK checks
-- if commit_lsn - back_lsn > threshold,
-- -- remove SK-1 from the candidate and call determine_offloader again.

SK-1 will step down and all SKs will elect the same leader again.
After the backup is caught up, the leader will become SK-1 again.

This also helps when SK-1 is slow to backup. 

I'll set the reelect backup lag to 4 GB later. Setting to 128 MB in dev
to trigger the code more frequently.

III. 
This change addresses problem no. 1 by having the Safekeeper perform a
timeline disk utilization check check when processing WAL proposal
messages from Postgres/compute. The Safekeeper now rejects the WAL
proposal message, effectively stops writing more WAL for the timeline to
disk, if the existing WAL files for the timeline on the SK disk exceeds
a certain size (the default threshold is 100GB). The disk utilization is
calculated based on a `last_removed_segno` variable tracked by the
background task removing WAL files, which produces an accurate and
conservative estimate (>= than actual disk usage) of the actual disk
usage.


IV.
* Add a new metric `hadron_critical_storage_event_count` that has the
`tenant_shard_id` and `timeline_id` as dimensions.
* Modified the `crtitical!` marco to include tenant_id and timeline_id
as additional arguments and adapted existing call sites to populate the
tenant shard and timeline ID fields. The `critical!` marco invocation
now increments the `hadron_critical_storage_event_count` with the extra
dimensions. (In SK there isn't the notion of a tenant-shard, so just the
tenant ID is recorded in lieu of tenant shard ID.)

I considered adding a separate marco to avoid merge conflicts, but I
think in this case (detecting critical errors) conflicts are probably
more desirable so that we can be aware whenever Neon adds another
`critical!` invocation in their code.

---------

Co-authored-by: Chen Luo <chen.luo@databricks.com>
Co-authored-by: Haoyu Huang <haoyu.huang@databricks.com>
Co-authored-by: William Huang <william.huang@databricks.com>
2025-07-03 14:32:53 +00:00

298 lines
10 KiB
Rust

//! Control file serialization, deserialization and persistence.
use std::future::Future;
use std::io::Read;
use std::ops::Deref;
use std::path::Path;
use std::time::Instant;
use anyhow::{Context, Result, bail, ensure};
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use camino::{Utf8Path, Utf8PathBuf};
use safekeeper_api::membership::INVALID_GENERATION;
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
use utils::bin_ser::LeSer;
use utils::crashsafe::durable_rename;
use crate::control_file_upgrade::{downgrade_v10_to_v9, upgrade_control_file};
use crate::metrics::PERSIST_CONTROL_FILE_SECONDS;
use crate::metrics::WAL_DISK_IO_ERRORS;
use crate::state::{EvictionState, TimelinePersistentState};
pub const SK_MAGIC: u32 = 0xcafeceefu32;
pub const SK_FORMAT_VERSION: u32 = 10;
// contains persistent metadata for safekeeper
pub const CONTROL_FILE_NAME: &str = "safekeeper.control";
// needed to atomically update the state using `rename`
const CONTROL_FILE_NAME_PARTIAL: &str = "safekeeper.control.partial";
pub const CHECKSUM_SIZE: usize = size_of::<u32>();
/// Storage should keep actual state inside of it. It should implement Deref
/// trait to access state fields and have persist method for updating that state.
pub trait Storage: Deref<Target = TimelinePersistentState> {
/// Persist safekeeper state on disk and update internal state.
fn persist(&mut self, s: &TimelinePersistentState) -> impl Future<Output = Result<()>> + Send;
/// Timestamp of last persist.
fn last_persist_at(&self) -> Instant;
}
#[derive(Debug)]
pub struct FileStorage {
// save timeline dir to avoid reconstructing it every time
timeline_dir: Utf8PathBuf,
no_sync: bool,
/// Last state persisted to disk.
state: TimelinePersistentState,
/// Not preserved across restarts.
last_persist_at: Instant,
}
impl FileStorage {
/// Initialize storage by loading state from disk.
pub fn restore_new(timeline_dir: &Utf8Path, no_sync: bool) -> Result<FileStorage> {
let state = Self::load_control_file_from_dir(timeline_dir)?;
Ok(FileStorage {
timeline_dir: timeline_dir.to_path_buf(),
no_sync,
state,
last_persist_at: Instant::now(),
})
}
/// Create and reliably persist new control file at given location.
///
/// Note: we normally call this in temp directory for atomic init, so
/// interested in FileStorage as a result only in tests.
pub async fn create_new(
timeline_dir: &Utf8Path,
state: TimelinePersistentState,
no_sync: bool,
) -> Result<FileStorage> {
// we don't support creating new timelines in offloaded state
assert!(matches!(state.eviction_state, EvictionState::Present));
let mut store = FileStorage {
timeline_dir: timeline_dir.to_path_buf(),
no_sync,
state: state.clone(),
last_persist_at: Instant::now(),
};
store.persist(&state).await?;
Ok(store)
}
/// Check the magic/version in the on-disk data and deserialize it, if possible.
fn deser_sk_state(buf: &mut &[u8]) -> Result<TimelinePersistentState> {
// Read the version independent part
let magic = ReadBytesExt::read_u32::<LittleEndian>(buf)?;
if magic != SK_MAGIC {
bail!(
"bad control file magic: {:X}, expected {:X}",
magic,
SK_MAGIC
);
}
let version = ReadBytesExt::read_u32::<LittleEndian>(buf)?;
if version == SK_FORMAT_VERSION {
let res = TimelinePersistentState::des(buf)?;
return Ok(res);
}
// try to upgrade
upgrade_control_file(buf, version)
}
/// Load control file from given directory.
fn load_control_file_from_dir(timeline_dir: &Utf8Path) -> Result<TimelinePersistentState> {
let path = timeline_dir.join(CONTROL_FILE_NAME);
Self::load_control_file(path)
}
/// Read in the control file.
pub fn load_control_file<P: AsRef<Path>>(
control_file_path: P,
) -> Result<TimelinePersistentState> {
let mut control_file = std::fs::OpenOptions::new()
.read(true)
.write(true)
.open(&control_file_path)
.with_context(|| {
format!(
"failed to open control file at {}",
control_file_path.as_ref().display(),
)
})?;
let mut buf = Vec::new();
control_file
.read_to_end(&mut buf)
.context("failed to read control file")?;
let calculated_checksum = crc32c::crc32c(&buf[..buf.len() - CHECKSUM_SIZE]);
let expected_checksum_bytes: &[u8; CHECKSUM_SIZE] =
buf[buf.len() - CHECKSUM_SIZE..].try_into()?;
let expected_checksum = u32::from_le_bytes(*expected_checksum_bytes);
ensure!(
calculated_checksum == expected_checksum,
format!(
"safekeeper control file checksum mismatch: expected {} got {}",
expected_checksum, calculated_checksum
)
);
let state = FileStorage::deser_sk_state(&mut &buf[..buf.len() - CHECKSUM_SIZE])
.with_context(|| {
format!(
"while reading control file {}",
control_file_path.as_ref().display(),
)
})?;
Ok(state)
}
}
impl Deref for FileStorage {
type Target = TimelinePersistentState;
fn deref(&self) -> &Self::Target {
&self.state
}
}
impl TimelinePersistentState {
pub(crate) fn write_to_buf(&self) -> Result<Vec<u8>> {
let mut buf: Vec<u8> = Vec::new();
WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_MAGIC)?;
if self.mconf.generation == INVALID_GENERATION {
// Temp hack for forward compatibility test: in case of none
// configuration save cfile in previous v9 format.
const PREV_FORMAT_VERSION: u32 = 9;
let prev = downgrade_v10_to_v9(self);
WriteBytesExt::write_u32::<LittleEndian>(&mut buf, PREV_FORMAT_VERSION)?;
prev.ser_into(&mut buf)?;
} else {
// otherwise, we write the current format version
WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_FORMAT_VERSION)?;
self.ser_into(&mut buf)?;
}
// calculate checksum before resize
let checksum = crc32c::crc32c(&buf);
buf.extend_from_slice(&checksum.to_le_bytes());
Ok(buf)
}
}
impl Storage for FileStorage {
/// Persists state durably to the underlying storage.
async fn persist(&mut self, s: &TimelinePersistentState) -> Result<()> {
// start timer for metrics
let _timer = PERSIST_CONTROL_FILE_SECONDS.start_timer();
// write data to safekeeper.control.partial
let control_partial_path = self.timeline_dir.join(CONTROL_FILE_NAME_PARTIAL);
let mut control_partial = File::create(&control_partial_path).await.with_context(|| {
/* BEGIN_HADRON */
WAL_DISK_IO_ERRORS.inc();
/*END_HADRON */
format!(
"failed to create partial control file at: {}",
&control_partial_path
)
})?;
let buf: Vec<u8> = s.write_to_buf()?;
control_partial.write_all(&buf).await.with_context(|| {
/* BEGIN_HADRON */
WAL_DISK_IO_ERRORS.inc();
/*END_HADRON */
format!("failed to write safekeeper state into control file at: {control_partial_path}")
})?;
control_partial.flush().await.with_context(|| {
/* BEGIN_HADRON */
WAL_DISK_IO_ERRORS.inc();
/*END_HADRON */
format!("failed to flush safekeeper state into control file at: {control_partial_path}")
})?;
let control_path = self.timeline_dir.join(CONTROL_FILE_NAME);
durable_rename(&control_partial_path, &control_path, !self.no_sync)
.await
/* BEGIN_HADRON */
.inspect_err(|_| WAL_DISK_IO_ERRORS.inc())?;
/* END_HADRON */
// update internal state
self.state = s.clone();
Ok(())
}
fn last_persist_at(&self) -> Instant {
self.last_persist_at
}
}
#[cfg(test)]
mod test {
use safekeeper_api::membership::{Configuration, MemberSet, SafekeeperGeneration};
use tokio::fs;
use utils::lsn::Lsn;
use super::*;
const NO_SYNC: bool = true;
#[tokio::test]
async fn test_read_write_safekeeper_state() -> anyhow::Result<()> {
let tempdir = camino_tempfile::tempdir()?;
let mut state = TimelinePersistentState::empty();
state.mconf = Configuration {
generation: SafekeeperGeneration::new(42),
members: MemberSet::empty(),
new_members: None,
};
let mut storage = FileStorage::create_new(tempdir.path(), state.clone(), NO_SYNC).await?;
// Make a change.
state.commit_lsn = Lsn(42);
storage.persist(&state).await?;
// Reload the state. It should match the previously persisted state.
let loaded_state = FileStorage::load_control_file_from_dir(tempdir.path())?;
assert_eq!(loaded_state, state);
Ok(())
}
#[tokio::test]
async fn test_safekeeper_state_checksum_mismatch() -> anyhow::Result<()> {
let tempdir = camino_tempfile::tempdir()?;
let mut state = TimelinePersistentState::empty();
let mut storage = FileStorage::create_new(tempdir.path(), state.clone(), NO_SYNC).await?;
// Make a change.
state.commit_lsn = Lsn(42);
storage.persist(&state).await?;
// Change the first byte to fail checksum validation.
let ctrl_path = tempdir.path().join(CONTROL_FILE_NAME);
let mut data = fs::read(&ctrl_path).await?;
data[0] += 1;
fs::write(&ctrl_path, &data).await?;
// Loading the file should fail checksum validation.
if let Err(err) = FileStorage::load_control_file_from_dir(tempdir.path()) {
assert!(err.to_string().contains("control file checksum mismatch"))
} else {
panic!("expected checksum error")
}
Ok(())
}
}