Files
neon/safekeeper/src/timeline_eviction.rs
Arpad Müller 552249607d apply clippy fixes for 1.88.0 beta (#12331)
The 1.88.0 stable release is near (this Thursday). We'd like to fix most
warnings beforehand so that the compiler upgrade doesn't require
approval from too many teams.

This is therefore a preparation PR (like similar PRs before it).

There is a lot of changes for this release, mostly because the
`uninlined_format_args` lint has been added to the `style` lint group.
One can read more about the lint
[here](https://rust-lang.github.io/rust-clippy/master/#/uninlined_format_args).

The PR is the result of `cargo +beta clippy --fix` and `cargo fmt`. One
remaining warning is left for the proxy team.

---------

Co-authored-by: Conrad Ludgate <conrad@neon.tech>
2025-06-24 10:12:42 +00:00

417 lines
14 KiB
Rust

//! Code related to evicting WAL files to remote storage.
//!
//! The actual upload is done by the partial WAL backup code. This file has
//! code to delete and re-download WAL files, cross-validate with partial WAL
//! backup if local file is still present.
use anyhow::Context;
use camino::Utf8PathBuf;
use remote_storage::{GenericRemoteStorage, RemotePath};
use tokio::fs::File;
use tokio::io::{AsyncRead, AsyncWriteExt};
use tracing::{debug, info, instrument, warn};
use utils::crashsafe::durable_rename;
use crate::metrics::{
EVICTION_EVENTS_COMPLETED, EVICTION_EVENTS_STARTED, EvictionEvent, NUM_EVICTED_TIMELINES,
};
use crate::rate_limit::rand_duration;
use crate::timeline_manager::{Manager, StateSnapshot};
use crate::wal_backup;
use crate::wal_backup_partial::{self, PartialRemoteSegment};
use crate::wal_storage::wal_file_paths;
impl Manager {
/// Returns true if the timeline is ready for eviction.
/// Current criteria:
/// - no active tasks
/// - control file is flushed (no next event scheduled)
/// - no WAL residence guards
/// - no pushes to the broker
/// - last partial WAL segment is uploaded
/// - all local segments before the uploaded partial are committed and uploaded
pub(crate) fn ready_for_eviction(
&self,
next_event: &Option<tokio::time::Instant>,
state: &StateSnapshot,
) -> bool {
self.backup_task.is_none()
&& self.recovery_task.is_none()
&& self.wal_removal_task.is_none()
&& self.partial_backup_task.is_none()
&& next_event.is_none()
&& self.access_service.is_empty()
&& !self.tli_broker_active.get()
// Partial segment of current flush_lsn is uploaded up to this flush_lsn.
&& !wal_backup_partial::needs_uploading(state, &self.partial_backup_uploaded)
// And it is the next one after the last removed. Given that local
// WAL is removed only after it is uploaded to s3 (and pageserver
// advancing remote_consistent_lsn) which happens only after WAL is
// committed, true means all this is done.
//
// This also works for the first segment despite last_removed_segno
// being 0 on init because this 0 triggers run of wal_removal_task
// on success of which manager updates the horizon.
//
// **Note** pull_timeline functionality assumes that evicted timelines always have
// a partial segment: if we ever change this condition, must also update that code.
&& self
.partial_backup_uploaded
.as_ref()
.unwrap()
.flush_lsn
.segment_number(self.wal_seg_size)
== self.last_removed_segno + 1
}
/// Evict the timeline to remote storage. Returns whether the eviction was successful.
#[instrument(name = "evict_timeline", skip_all)]
pub(crate) async fn evict_timeline(&mut self) -> bool {
assert!(!self.is_offloaded);
let Some(storage) = self.wal_backup.get_storage() else {
warn!("no remote storage configured, skipping uneviction");
return false;
};
let partial_backup_uploaded = match &self.partial_backup_uploaded {
Some(p) => p.clone(),
None => {
warn!("no partial backup uploaded, skipping eviction");
return false;
}
};
info!("starting eviction, using {:?}", partial_backup_uploaded);
EVICTION_EVENTS_STARTED
.with_label_values(&[EvictionEvent::Evict.into()])
.inc();
let _guard = scopeguard::guard((), |_| {
EVICTION_EVENTS_COMPLETED
.with_label_values(&[EvictionEvent::Evict.into()])
.inc();
});
if let Err(e) = do_eviction(self, &partial_backup_uploaded, &storage).await {
warn!("failed to evict timeline: {:?}", e);
return false;
}
info!("successfully evicted timeline");
NUM_EVICTED_TIMELINES.inc();
true
}
/// Attempt to restore evicted timeline from remote storage; it must be
/// offloaded.
#[instrument(name = "unevict_timeline", skip_all)]
pub(crate) async fn unevict_timeline(&mut self) {
assert!(self.is_offloaded);
let Some(storage) = self.wal_backup.get_storage() else {
warn!("no remote storage configured, skipping uneviction");
return;
};
let partial_backup_uploaded = match &self.partial_backup_uploaded {
Some(p) => p.clone(),
None => {
warn!("no partial backup uploaded, cannot unevict");
return;
}
};
info!("starting uneviction, using {:?}", partial_backup_uploaded);
EVICTION_EVENTS_STARTED
.with_label_values(&[EvictionEvent::Restore.into()])
.inc();
let _guard = scopeguard::guard((), |_| {
EVICTION_EVENTS_COMPLETED
.with_label_values(&[EvictionEvent::Restore.into()])
.inc();
});
if let Err(e) = do_uneviction(self, &partial_backup_uploaded, &storage).await {
warn!("failed to unevict timeline: {:?}", e);
return;
}
self.evict_not_before =
tokio::time::Instant::now() + rand_duration(&self.conf.eviction_min_resident);
info!("successfully restored evicted timeline");
NUM_EVICTED_TIMELINES.dec();
}
}
/// Ensure that content matches the remote partial backup, if local segment exists.
/// Then change state in control file and in-memory. If `delete_offloaded_wal` is set,
/// delete the local segment.
async fn do_eviction(
mgr: &mut Manager,
partial: &PartialRemoteSegment,
storage: &GenericRemoteStorage,
) -> anyhow::Result<()> {
compare_local_segment_with_remote(mgr, partial, storage).await?;
mgr.tli.switch_to_offloaded(partial).await?;
// switch manager state as soon as possible
mgr.is_offloaded = true;
if mgr.conf.delete_offloaded_wal {
delete_local_segment(mgr, partial).await?;
}
Ok(())
}
/// Ensure that content matches the remote partial backup, if local segment exists.
/// Then download segment to local disk and change state in control file and in-memory.
async fn do_uneviction(
mgr: &mut Manager,
partial: &PartialRemoteSegment,
storage: &GenericRemoteStorage,
) -> anyhow::Result<()> {
// if the local segment is present, validate it
compare_local_segment_with_remote(mgr, partial, storage).await?;
// atomically download the partial segment
redownload_partial_segment(mgr, partial, storage).await?;
mgr.tli.switch_to_present().await?;
// switch manager state as soon as possible
mgr.is_offloaded = false;
Ok(())
}
/// Delete local WAL segment.
async fn delete_local_segment(mgr: &Manager, partial: &PartialRemoteSegment) -> anyhow::Result<()> {
let local_path = local_segment_path(mgr, partial);
info!("deleting WAL file to evict: {}", local_path);
tokio::fs::remove_file(&local_path).await?;
Ok(())
}
/// Redownload partial segment from remote storage.
/// The segment is downloaded to a temporary file and then renamed to the final path.
async fn redownload_partial_segment(
mgr: &Manager,
partial: &PartialRemoteSegment,
storage: &GenericRemoteStorage,
) -> anyhow::Result<()> {
let tmp_file = mgr.tli.timeline_dir().join("remote_partial.tmp");
let remote_segfile = remote_segment_path(mgr, partial);
debug!(
"redownloading partial segment: {} -> {}",
remote_segfile, tmp_file
);
let mut reader = wal_backup::read_object(storage, &remote_segfile, 0).await?;
let mut file = File::create(&tmp_file).await?;
let actual_len = tokio::io::copy(&mut reader, &mut file).await?;
let expected_len = partial.flush_lsn.segment_offset(mgr.wal_seg_size);
if actual_len != expected_len as u64 {
anyhow::bail!(
"partial downloaded {} bytes, expected {}",
actual_len,
expected_len
);
}
if actual_len > mgr.wal_seg_size as u64 {
anyhow::bail!(
"remote segment is too long: {} bytes, expected {}",
actual_len,
mgr.wal_seg_size
);
}
file.set_len(mgr.wal_seg_size as u64).await?;
file.flush().await?;
let final_path = local_segment_path(mgr, partial);
info!("downloaded {actual_len} bytes, renaming to {final_path}");
if let Err(e) = durable_rename(&tmp_file, &final_path, !mgr.conf.no_sync).await {
// Probably rename succeeded, but fsync of it failed. Remove
// the file then to avoid using it.
tokio::fs::remove_file(tmp_file)
.await
.or_else(utils::fs_ext::ignore_not_found)?;
return Err(e.into());
}
Ok(())
}
/// Compare local WAL segment with partial WAL backup in remote storage.
/// If the local segment is not present, the function does nothing.
/// If the local segment is present, it compares the local segment with the remote one.
async fn compare_local_segment_with_remote(
mgr: &Manager,
partial: &PartialRemoteSegment,
storage: &GenericRemoteStorage,
) -> anyhow::Result<()> {
let local_path = local_segment_path(mgr, partial);
match File::open(&local_path).await {
Ok(mut local_file) => {
do_validation(mgr, &mut local_file, mgr.wal_seg_size, partial, storage)
.await
.context("validation failed")
}
Err(_) => {
info!(
"local WAL file {} is not present, skipping validation",
local_path
);
Ok(())
}
}
}
/// Compare opened local WAL segment with partial WAL backup in remote storage.
/// Validate full content of both files.
async fn do_validation(
mgr: &Manager,
file: &mut File,
wal_seg_size: usize,
partial: &PartialRemoteSegment,
storage: &GenericRemoteStorage,
) -> anyhow::Result<()> {
let local_size = file.metadata().await?.len() as usize;
if local_size != wal_seg_size {
anyhow::bail!(
"local segment size is invalid: found {}, expected {}",
local_size,
wal_seg_size
);
}
let remote_segfile = remote_segment_path(mgr, partial);
let mut remote_reader: std::pin::Pin<Box<dyn AsyncRead + Send + Sync>> =
wal_backup::read_object(storage, &remote_segfile, 0).await?;
// remote segment should have bytes excatly up to `flush_lsn`
let expected_remote_size = partial.flush_lsn.segment_offset(mgr.wal_seg_size);
// let's compare the first `expected_remote_size` bytes
compare_n_bytes(&mut remote_reader, file, expected_remote_size).await?;
// and check that the remote segment ends here
check_end(&mut remote_reader).await?;
// if local segment is longer, the rest should be zeroes
read_n_zeroes(file, mgr.wal_seg_size - expected_remote_size).await?;
// and check that the local segment ends here
check_end(file).await?;
Ok(())
}
fn local_segment_path(mgr: &Manager, partial: &PartialRemoteSegment) -> Utf8PathBuf {
let flush_lsn = partial.flush_lsn;
let segno = flush_lsn.segment_number(mgr.wal_seg_size);
let (_, local_partial_segfile) =
wal_file_paths(mgr.tli.timeline_dir(), segno, mgr.wal_seg_size);
local_partial_segfile
}
fn remote_segment_path(mgr: &Manager, partial: &PartialRemoteSegment) -> RemotePath {
partial.remote_path(&mgr.tli.remote_path)
}
/// Compare first `n` bytes of two readers. If the bytes differ, return an error.
/// If the readers are shorter than `n`, return an error.
async fn compare_n_bytes<R1, R2>(reader1: &mut R1, reader2: &mut R2, n: usize) -> anyhow::Result<()>
where
R1: AsyncRead + Unpin,
R2: AsyncRead + Unpin,
{
use tokio::io::AsyncReadExt;
const BUF_SIZE: usize = 32 * 1024;
let mut buffer1 = vec![0u8; BUF_SIZE];
let mut buffer2 = vec![0u8; BUF_SIZE];
let mut offset = 0;
while offset < n {
let bytes_to_read = std::cmp::min(BUF_SIZE, n - offset);
let bytes_read1 = reader1
.read(&mut buffer1[..bytes_to_read])
.await
.with_context(|| format!("failed to read from reader1 at offset {offset}"))?;
if bytes_read1 == 0 {
anyhow::bail!("unexpected EOF from reader1 at offset {}", offset);
}
let bytes_read2 = reader2
.read_exact(&mut buffer2[..bytes_read1])
.await
.with_context(|| {
format!("failed to read {bytes_read1} bytes from reader2 at offset {offset}")
})?;
assert!(bytes_read2 == bytes_read1);
if buffer1[..bytes_read1] != buffer2[..bytes_read2] {
let diff_offset = buffer1[..bytes_read1]
.iter()
.zip(buffer2[..bytes_read2].iter())
.position(|(a, b)| a != b)
.expect("mismatched buffers, but no difference found");
anyhow::bail!("mismatch at offset {}", offset + diff_offset);
}
offset += bytes_read1;
}
Ok(())
}
async fn check_end<R>(mut reader: R) -> anyhow::Result<()>
where
R: AsyncRead + Unpin,
{
use tokio::io::AsyncReadExt;
let mut buffer = [0u8; 1];
let bytes_read = reader.read(&mut buffer).await?;
if bytes_read != 0 {
anyhow::bail!("expected EOF, found bytes");
}
Ok(())
}
async fn read_n_zeroes<R>(reader: &mut R, n: usize) -> anyhow::Result<()>
where
R: AsyncRead + Unpin,
{
use tokio::io::AsyncReadExt;
const BUF_SIZE: usize = 32 * 1024;
let mut buffer = vec![0u8; BUF_SIZE];
let mut offset = 0;
while offset < n {
let bytes_to_read = std::cmp::min(BUF_SIZE, n - offset);
let bytes_read = reader
.read(&mut buffer[..bytes_to_read])
.await
.context("expected zeroes, got read error")?;
if bytes_read == 0 {
anyhow::bail!("expected zeroes, got EOF");
}
if buffer[..bytes_read].iter().all(|&b| b == 0) {
offset += bytes_read;
} else {
anyhow::bail!("non-zero byte found");
}
}
Ok(())
}