refactor delta&image writers to perform cleanup on Drop in the background

In #10063 we will switch BlobWriter, which underlies delta and image
layer writers, to use the owned buffers IO buffered writer.

That buffered writer implements double-buffering by virtue of a background task
that performs the flushing -- it owns the VirtualFile and both
DeltaLayerWriter and ImageLayerWriter are mere clients to it.

The implication is that it's no longer true that dropping these client
objects guarantees that all IO activity is complete. We must wait for the
flush task to exit.

In preparation for that new world, this PR moves the cleanup to a short-lived
task that is spawned from the Drop impl, and adds appropriate gate guard
holdings to hook it into the Timeline lifecycle.

We must (theoretically) worry that there will be a retry inbetween Drop
completing and the spawned task completing. It could collide on the
randomly generated temporary file name. We avoid this by switching to a
global monotonic counter.

Refs
- extracted from https://github.com/neondatabase/neon/pull/10063
- epic https://github.com/neondatabase/neon/issues/9868
This commit is contained in:
Christian Schwarz
2025-04-11 17:20:20 +02:00
parent 062c7b9a76
commit 2f0677be26
2 changed files with 56 additions and 26 deletions

View File

@@ -34,6 +34,7 @@ use std::ops::Range;
use std::os::unix::fs::FileExt;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use anyhow::{Context, Result, bail, ensure};
use camino::{Utf8Path, Utf8PathBuf};
@@ -45,8 +46,6 @@ use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::ImageCompressionAlgorithm;
use pageserver_api::shard::TenantShardId;
use pageserver_api::value::Value;
use rand::Rng;
use rand::distributions::Alphanumeric;
use serde::{Deserialize, Serialize};
use tokio::sync::OnceCell;
use tokio_epoll_uring::IoBuf;
@@ -288,19 +287,19 @@ impl DeltaLayer {
key_start: Key,
lsn_range: &Range<Lsn>,
) -> Utf8PathBuf {
let rand_string: String = rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(8)
.map(char::from)
.collect();
// Never reuse a filename in the lifetime of a pageserver process so that we need
// not worry about laggard Drop impl's async unlink hitting an already reused filename.
static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1);
let filename_disambiguator =
NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
conf.timeline_path(tenant_shard_id, timeline_id)
.join(format!(
"{}-XXX__{:016X}-{:016X}.{}.{}",
"{}-XXX__{:016X}-{:016X}.{:x}.{}",
key_start,
u64::from(lsn_range.start),
u64::from(lsn_range.end),
rand_string,
filename_disambiguator,
TEMP_FILE_SUFFIX,
))
}
@@ -395,6 +394,8 @@ struct DeltaLayerWriterInner {
// Number of key-lsns in the layer.
num_keys: usize,
_gate_guard: utils::sync::gate::GateGuard,
}
impl DeltaLayerWriterInner {
@@ -439,6 +440,7 @@ impl DeltaLayerWriterInner {
tree: tree_builder,
blob_writer,
num_keys: 0,
_gate_guard: gate.enter()?,
})
}
@@ -728,12 +730,22 @@ impl DeltaLayerWriter {
impl Drop for DeltaLayerWriter {
fn drop(&mut self) {
if let Some(inner) = self.inner.take() {
// We want to remove the virtual file here, so it's fine to not
// having completely flushed unwritten data.
let vfile = inner.blob_writer.into_inner_no_flush();
let Some(inner) = self.inner.take() else {
return;
};
tokio::spawn(async move {
let DeltaLayerWriterInner {
blob_writer,
_gate_guard,
..
} = inner;
let vfile = blob_writer.into_inner_no_flush();
vfile.remove();
}
drop(_gate_guard);
});
}
}
@@ -1609,8 +1621,8 @@ pub(crate) mod test {
use bytes::Bytes;
use itertools::MinMaxResult;
use pageserver_api::value::Value;
use rand::RngCore;
use rand::prelude::{SeedableRng, SliceRandom, StdRng};
use rand::{Rng, RngCore};
use super::*;
use crate::DEFAULT_PG_VERSION;

View File

@@ -32,6 +32,7 @@ use std::ops::Range;
use std::os::unix::prelude::FileExt;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use anyhow::{Context, Result, bail, ensure};
use bytes::Bytes;
@@ -43,8 +44,6 @@ use pageserver_api::key::{DBDIR_KEY, KEY_SIZE, Key};
use pageserver_api::keyspace::KeySpace;
use pageserver_api::shard::{ShardIdentity, TenantShardId};
use pageserver_api::value::Value;
use rand::Rng;
use rand::distributions::Alphanumeric;
use serde::{Deserialize, Serialize};
use tokio::sync::OnceCell;
use tokio_stream::StreamExt;
@@ -252,14 +251,17 @@ impl ImageLayer {
tenant_shard_id: TenantShardId,
fname: &ImageLayerName,
) -> Utf8PathBuf {
let rand_string: String = rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(8)
.map(char::from)
.collect();
// Never reuse a filename in the lifetime of a pageserver process so that we need
// not worry about laggard Drop impl's async unlink hitting an already reused filename.
static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1);
let filename_disambiguator =
NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
conf.timeline_path(&tenant_shard_id, &timeline_id)
.join(format!("{fname}.{rand_string}.{TEMP_FILE_SUFFIX}"))
.join(format!(
"{fname}.{:x}.{TEMP_FILE_SUFFIX}",
filename_disambiguator
))
}
///
@@ -743,6 +745,8 @@ struct ImageLayerWriterInner {
#[cfg(feature = "testing")]
last_written_key: Key,
_gate_guard: utils::sync::gate::GateGuard,
}
impl ImageLayerWriterInner {
@@ -805,6 +809,7 @@ impl ImageLayerWriterInner {
num_keys: 0,
#[cfg(feature = "testing")]
last_written_key: Key::MIN,
_gate_guard: gate.enter()?,
};
Ok(writer)
@@ -1066,9 +1071,22 @@ impl ImageLayerWriter {
impl Drop for ImageLayerWriter {
fn drop(&mut self) {
if let Some(inner) = self.inner.take() {
inner.blob_writer.into_inner().remove();
}
let Some(inner) = self.inner.take() else {
return;
};
tokio::spawn(async move {
let ImageLayerWriterInner {
blob_writer,
_gate_guard,
..
} = inner;
let vfile = blob_writer.into_inner();
vfile.remove();
drop(_gate_guard);
});
}
}