From 062c7b9a76ed1eb3a4084298a990893d73c9bca4 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 11 Apr 2025 17:00:00 +0200 Subject: [PATCH 1/2] refactor: plumb gate and cancellation down to to blob_io::BlobWriter In #10063 we will switch BlobWriter to use the owned buffers IO buffered writer, which implements double-buffering by virtue of a background task that performs the flushing. That task's lifecylce must be contained within the Timeline lifecycle, so, it must hold the timeline gate open and respect Timeline::cancel. This PR does the noisy plumbing to reduce the #10063 diff. Refs - extracted from https://github.com/neondatabase/neon/pull/10063 - epic https://github.com/neondatabase/neon/issues/9868 --- pageserver/benches/bench_ingest.rs | 2 +- pageserver/src/tenant/blob_io.rs | 13 +++++- .../storage_layer/batch_split_writer.rs | 44 +++++++++++++++++-- .../src/tenant/storage_layer/delta_layer.rs | 17 ++++++- .../src/tenant/storage_layer/image_layer.rs | 28 ++++++++++-- .../tenant/storage_layer/inmemory_layer.rs | 4 ++ pageserver/src/tenant/timeline.rs | 14 +++++- pageserver/src/tenant/timeline/compaction.rs | 20 ++++++++- .../src/tenant/timeline/detach_ancestor.rs | 4 ++ .../src/tenant/timeline/import_pgdata/flow.rs | 2 + 10 files changed, 134 insertions(+), 14 deletions(-) diff --git a/pageserver/benches/bench_ingest.rs b/pageserver/benches/bench_ingest.rs index 000938b189..3108b5351f 100644 --- a/pageserver/benches/bench_ingest.rs +++ b/pageserver/benches/bench_ingest.rs @@ -126,7 +126,7 @@ async fn ingest( max_concurrency: NonZeroUsize::new(1).unwrap(), }); let (_desc, path) = layer - .write_to_disk(&ctx, None, l0_flush_state.inner()) + .write_to_disk(&ctx, None, l0_flush_state.inner(), &gate, cancel.clone()) .await? .unwrap(); tokio::fs::remove_file(path).await?; diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index b0b2a16c2f..abeaa166a4 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -22,6 +22,7 @@ use bytes::{BufMut, BytesMut}; use pageserver_api::models::ImageCompressionAlgorithm; use tokio::io::AsyncWriteExt; use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice}; +use tokio_util::sync::CancellationToken; use tracing::warn; use crate::context::RequestContext; @@ -169,7 +170,13 @@ pub struct BlobWriter { } impl BlobWriter { - pub fn new(inner: VirtualFile, start_offset: u64) -> Self { + pub fn new( + inner: VirtualFile, + start_offset: u64, + _gate: &utils::sync::gate::Gate, + _cancel: CancellationToken, + _ctx: &RequestContext, + ) -> Self { Self { inner, offset: start_offset, @@ -432,12 +439,14 @@ pub(crate) mod tests { ) -> Result<(Utf8TempDir, Utf8PathBuf, Vec), Error> { let temp_dir = camino_tempfile::tempdir()?; let pathbuf = temp_dir.path().join("file"); + let gate = utils::sync::gate::Gate::default(); + let cancel = CancellationToken::new(); // Write part (in block to drop the file) let mut offsets = Vec::new(); { let file = VirtualFile::create(pathbuf.as_path(), ctx).await?; - let mut wtr = BlobWriter::::new(file, 0); + let mut wtr = BlobWriter::::new(file, 0, &gate, cancel.clone(), ctx); for blob in blobs.iter() { let (_, res) = if compression { let res = wtr diff --git a/pageserver/src/tenant/storage_layer/batch_split_writer.rs b/pageserver/src/tenant/storage_layer/batch_split_writer.rs index 29ada15c36..39cd02d101 100644 --- a/pageserver/src/tenant/storage_layer/batch_split_writer.rs +++ b/pageserver/src/tenant/storage_layer/batch_split_writer.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use bytes::Bytes; use pageserver_api::key::{KEY_SIZE, Key}; use pageserver_api::value::Value; +use tokio_util::sync::CancellationToken; use utils::id::TimelineId; use utils::lsn::Lsn; use utils::shard::TenantShardId; @@ -179,7 +180,7 @@ impl BatchLayerWriter { /// An image writer that takes images and produces multiple image layers. #[must_use] -pub struct SplitImageLayerWriter { +pub struct SplitImageLayerWriter<'a> { inner: ImageLayerWriter, target_layer_size: u64, lsn: Lsn, @@ -188,9 +189,12 @@ pub struct SplitImageLayerWriter { tenant_shard_id: TenantShardId, batches: BatchLayerWriter, start_key: Key, + gate: &'a utils::sync::gate::Gate, + cancel: CancellationToken, } -impl SplitImageLayerWriter { +impl<'a> SplitImageLayerWriter<'a> { + #[allow(clippy::too_many_arguments)] pub async fn new( conf: &'static PageServerConf, timeline_id: TimelineId, @@ -198,6 +202,8 @@ impl SplitImageLayerWriter { start_key: Key, lsn: Lsn, target_layer_size: u64, + gate: &'a utils::sync::gate::Gate, + cancel: CancellationToken, ctx: &RequestContext, ) -> anyhow::Result { Ok(Self { @@ -208,6 +214,8 @@ impl SplitImageLayerWriter { tenant_shard_id, &(start_key..Key::MAX), lsn, + gate, + cancel.clone(), ctx, ) .await?, @@ -217,6 +225,8 @@ impl SplitImageLayerWriter { batches: BatchLayerWriter::new(conf).await?, lsn, start_key, + gate, + cancel, }) } @@ -239,6 +249,8 @@ impl SplitImageLayerWriter { self.tenant_shard_id, &(key..Key::MAX), self.lsn, + self.gate, + self.cancel.clone(), ctx, ) .await?; @@ -291,7 +303,7 @@ impl SplitImageLayerWriter { /// into a single file. This behavior might change in the future. For reference, the legacy compaction algorithm /// will split them into multiple files based on size. #[must_use] -pub struct SplitDeltaLayerWriter { +pub struct SplitDeltaLayerWriter<'a> { inner: Option<(Key, DeltaLayerWriter)>, target_layer_size: u64, conf: &'static PageServerConf, @@ -300,15 +312,19 @@ pub struct SplitDeltaLayerWriter { lsn_range: Range, last_key_written: Key, batches: BatchLayerWriter, + gate: &'a utils::sync::gate::Gate, + cancel: CancellationToken, } -impl SplitDeltaLayerWriter { +impl<'a> SplitDeltaLayerWriter<'a> { pub async fn new( conf: &'static PageServerConf, timeline_id: TimelineId, tenant_shard_id: TenantShardId, lsn_range: Range, target_layer_size: u64, + gate: &'a utils::sync::gate::Gate, + cancel: CancellationToken, ) -> anyhow::Result { Ok(Self { target_layer_size, @@ -319,6 +335,8 @@ impl SplitDeltaLayerWriter { lsn_range, last_key_written: Key::MIN, batches: BatchLayerWriter::new(conf).await?, + gate, + cancel, }) } @@ -344,6 +362,8 @@ impl SplitDeltaLayerWriter { self.tenant_shard_id, key, self.lsn_range.clone(), + self.gate, + self.cancel.clone(), ctx, ) .await?, @@ -362,6 +382,8 @@ impl SplitDeltaLayerWriter { self.tenant_shard_id, key, self.lsn_range.clone(), + self.gate, + self.cancel.clone(), ctx, ) .await?; @@ -469,6 +491,8 @@ mod tests { get_key(0), Lsn(0x18), 4 * 1024 * 1024, + &tline.gate, + tline.cancel.clone(), &ctx, ) .await @@ -480,6 +504,8 @@ mod tests { tenant.tenant_shard_id, Lsn(0x18)..Lsn(0x20), 4 * 1024 * 1024, + &tline.gate, + tline.cancel.clone(), ) .await .unwrap(); @@ -546,6 +572,8 @@ mod tests { get_key(0), Lsn(0x18), 4 * 1024 * 1024, + &tline.gate, + tline.cancel.clone(), &ctx, ) .await @@ -556,6 +584,8 @@ mod tests { tenant.tenant_shard_id, Lsn(0x18)..Lsn(0x20), 4 * 1024 * 1024, + &tline.gate, + tline.cancel.clone(), ) .await .unwrap(); @@ -643,6 +673,8 @@ mod tests { get_key(0), Lsn(0x18), 4 * 1024, + &tline.gate, + tline.cancel.clone(), &ctx, ) .await @@ -654,6 +686,8 @@ mod tests { tenant.tenant_shard_id, Lsn(0x18)..Lsn(0x20), 4 * 1024, + &tline.gate, + tline.cancel.clone(), ) .await .unwrap(); @@ -730,6 +764,8 @@ mod tests { tenant.tenant_shard_id, Lsn(0x10)..Lsn(N as u64 * 16 + 0x10), 4 * 1024 * 1024, + &tline.gate, + tline.cancel.clone(), ) .await .unwrap(); diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 05b0bc1a5c..4417b8aa51 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -50,6 +50,7 @@ use rand::distributions::Alphanumeric; use serde::{Deserialize, Serialize}; use tokio::sync::OnceCell; use tokio_epoll_uring::IoBuf; +use tokio_util::sync::CancellationToken; use tracing::*; use utils::bin_ser::BeSer; use utils::id::{TenantId, TimelineId}; @@ -400,12 +401,15 @@ impl DeltaLayerWriterInner { /// /// Start building a new delta layer. /// + #[allow(clippy::too_many_arguments)] async fn new( conf: &'static PageServerConf, timeline_id: TimelineId, tenant_shard_id: TenantShardId, key_start: Key, lsn_range: Range, + gate: &utils::sync::gate::Gate, + cancel: CancellationToken, ctx: &RequestContext, ) -> anyhow::Result { // Create the file initially with a temporary filename. We don't know @@ -420,7 +424,7 @@ impl DeltaLayerWriterInner { let mut file = VirtualFile::create(&path, ctx).await?; // make room for the header block file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?; - let blob_writer = BlobWriter::new(file, PAGE_SZ as u64); + let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, cancel, ctx); // Initialize the b-tree index builder let block_buf = BlockBuf::new(); @@ -628,12 +632,15 @@ impl DeltaLayerWriter { /// /// Start building a new delta layer. /// + #[allow(clippy::too_many_arguments)] pub async fn new( conf: &'static PageServerConf, timeline_id: TimelineId, tenant_shard_id: TenantShardId, key_start: Key, lsn_range: Range, + gate: &utils::sync::gate::Gate, + cancel: CancellationToken, ctx: &RequestContext, ) -> anyhow::Result { Ok(Self { @@ -644,6 +651,8 @@ impl DeltaLayerWriter { tenant_shard_id, key_start, lsn_range, + gate, + cancel, ctx, ) .await?, @@ -1885,6 +1894,8 @@ pub(crate) mod test { harness.tenant_shard_id, entries_meta.key_range.start, entries_meta.lsn_range.clone(), + &timeline.gate, + timeline.cancel.clone(), &ctx, ) .await?; @@ -2079,6 +2090,8 @@ pub(crate) mod test { tenant.tenant_shard_id, Key::MIN, Lsn(0x11)..truncate_at, + &branch.gate, + branch.cancel.clone(), ctx, ) .await @@ -2213,6 +2226,8 @@ pub(crate) mod test { tenant.tenant_shard_id, *key_start, (*lsn_min)..lsn_end, + &tline.gate, + tline.cancel.clone(), ctx, ) .await?; diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 72992e5031..3744d615f2 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -48,6 +48,7 @@ use rand::distributions::Alphanumeric; use serde::{Deserialize, Serialize}; use tokio::sync::OnceCell; use tokio_stream::StreamExt; +use tokio_util::sync::CancellationToken; use tracing::*; use utils::bin_ser::BeSer; use utils::id::{TenantId, TimelineId}; @@ -748,12 +749,15 @@ impl ImageLayerWriterInner { /// /// Start building a new image layer. /// + #[allow(clippy::too_many_arguments)] async fn new( conf: &'static PageServerConf, timeline_id: TimelineId, tenant_shard_id: TenantShardId, key_range: &Range, lsn: Lsn, + gate: &utils::sync::gate::Gate, + cancel: CancellationToken, ctx: &RequestContext, ) -> anyhow::Result { // Create the file initially with a temporary filename. @@ -780,7 +784,7 @@ impl ImageLayerWriterInner { }; // make room for the header block file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?; - let blob_writer = BlobWriter::new(file, PAGE_SZ as u64); + let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, cancel, ctx); // Initialize the b-tree index builder let block_buf = BlockBuf::new(); @@ -988,18 +992,30 @@ impl ImageLayerWriter { /// /// Start building a new image layer. /// + #[allow(clippy::too_many_arguments)] pub async fn new( conf: &'static PageServerConf, timeline_id: TimelineId, tenant_shard_id: TenantShardId, key_range: &Range, lsn: Lsn, + gate: &utils::sync::gate::Gate, + cancel: CancellationToken, ctx: &RequestContext, ) -> anyhow::Result { Ok(Self { inner: Some( - ImageLayerWriterInner::new(conf, timeline_id, tenant_shard_id, key_range, lsn, ctx) - .await?, + ImageLayerWriterInner::new( + conf, + timeline_id, + tenant_shard_id, + key_range, + lsn, + gate, + cancel, + ctx, + ) + .await?, ), }) } @@ -1203,6 +1219,8 @@ mod test { harness.tenant_shard_id, &range, lsn, + &timeline.gate, + timeline.cancel.clone(), &ctx, ) .await @@ -1268,6 +1286,8 @@ mod test { harness.tenant_shard_id, &range, lsn, + &timeline.gate, + timeline.cancel.clone(), &ctx, ) .await @@ -1346,6 +1366,8 @@ mod test { tenant.tenant_shard_id, &key_range, lsn, + &tline.gate, + tline.cancel.clone(), ctx, ) .await?; diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 388ed3201c..5d558e66cc 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -719,6 +719,8 @@ impl InMemoryLayer { ctx: &RequestContext, key_range: Option>, l0_flush_global_state: &l0_flush::Inner, + gate: &utils::sync::gate::Gate, + cancel: CancellationToken, ) -> Result> { // Grab the lock in read-mode. We hold it over the I/O, but because this // layer is not writeable anymore, no one should be trying to acquire the @@ -759,6 +761,8 @@ impl InMemoryLayer { self.tenant_shard_id, Key::MIN, self.start_lsn..end_lsn, + gate, + cancel, ctx, ) .await?; diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 8a4a6f4b40..c9ce1d6d8b 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -4805,7 +4805,13 @@ impl Timeline { let ctx = ctx.attached_child(); let work = async move { let Some((desc, path)) = frozen_layer - .write_to_disk(&ctx, key_range, self_clone.l0_flush_global_state.inner()) + .write_to_disk( + &ctx, + key_range, + self_clone.l0_flush_global_state.inner(), + &self_clone.gate, + self_clone.cancel.clone(), + ) .await? else { return Ok(None); @@ -5343,6 +5349,8 @@ impl Timeline { self.tenant_shard_id, &img_range, lsn, + &self.gate, + self.cancel.clone(), ctx, ) .await?; @@ -6707,6 +6715,8 @@ impl Timeline { self.tenant_shard_id, &(min_key..end_key), lsn, + &self.gate, + self.cancel.clone(), ctx, ) .await?; @@ -6768,6 +6778,8 @@ impl Timeline { self.tenant_shard_id, deltas.key_range.start, deltas.lsn_range, + &self.gate, + self.cancel.clone(), ctx, ) .await?; diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index c6f0e32494..852c8643f7 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -747,8 +747,8 @@ impl KeyHistoryRetention { async fn pipe_to( self, key: Key, - delta_writer: &mut SplitDeltaLayerWriter, - mut image_writer: Option<&mut SplitImageLayerWriter>, + delta_writer: &mut SplitDeltaLayerWriter<'_>, + mut image_writer: Option<&mut SplitImageLayerWriter<'_>>, stat: &mut CompactionStatistics, ctx: &RequestContext, ) -> anyhow::Result<()> { @@ -1273,6 +1273,8 @@ impl Timeline { self.tenant_shard_id, &layer.layer_desc().key_range, layer.layer_desc().image_layer_lsn(), + &self.gate, + self.cancel.clone(), ctx, ) .await @@ -1889,6 +1891,8 @@ impl Timeline { debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end); lsn_range.clone() }, + &self.gate, + self.cancel.clone(), ctx, ) .await @@ -3083,6 +3087,8 @@ impl Timeline { job_desc.compaction_key_range.start, lowest_retain_lsn, self.get_compaction_target_size(), + &self.gate, + self.cancel.clone(), ctx, ) .await @@ -3099,6 +3105,8 @@ impl Timeline { self.tenant_shard_id, lowest_retain_lsn..end_lsn, self.get_compaction_target_size(), + &self.gate, + self.cancel.clone(), ) .await .context("failed to create delta layer writer") @@ -3195,6 +3203,8 @@ impl Timeline { self.tenant_shard_id, desc.key_range.start, desc.lsn_range.clone(), + &self.gate, + self.cancel.clone(), ctx, ) .await @@ -3212,6 +3222,8 @@ impl Timeline { self.tenant_shard_id, job_desc.compaction_key_range.end, desc.lsn_range.clone(), + &self.gate, + self.cancel.clone(), ctx, ) .await @@ -3781,6 +3793,8 @@ impl CompactionJobExecutor for TimelineAdaptor { self.timeline.tenant_shard_id, key_range.start, lsn_range.clone(), + &self.timeline.gate, + self.timeline.cancel.clone(), ctx, ) .await?; @@ -3856,6 +3870,8 @@ impl TimelineAdaptor { self.timeline.tenant_shard_id, key_range, lsn, + &self.timeline.gate, + self.timeline.cancel.clone(), ctx, ) .await?; diff --git a/pageserver/src/tenant/timeline/detach_ancestor.rs b/pageserver/src/tenant/timeline/detach_ancestor.rs index 1b0d22dc82..fdb6281bff 100644 --- a/pageserver/src/tenant/timeline/detach_ancestor.rs +++ b/pageserver/src/tenant/timeline/detach_ancestor.rs @@ -231,6 +231,8 @@ async fn generate_tombstone_image_layer( detached.tenant_shard_id, &key_range, image_lsn, + &detached.gate, + detached.cancel.clone(), ctx, ) .await @@ -779,6 +781,8 @@ async fn copy_lsn_prefix( target_timeline.tenant_shard_id, layer.layer_desc().key_range.start, layer.layer_desc().lsn_range.start..end_lsn, + &target_timeline.gate, + target_timeline.cancel.clone(), ctx, ) .await diff --git a/pageserver/src/tenant/timeline/import_pgdata/flow.rs b/pageserver/src/tenant/timeline/import_pgdata/flow.rs index 3ef82b3658..c6d2944769 100644 --- a/pageserver/src/tenant/timeline/import_pgdata/flow.rs +++ b/pageserver/src/tenant/timeline/import_pgdata/flow.rs @@ -738,6 +738,8 @@ impl ChunkProcessingJob { self.timeline.tenant_shard_id, &self.range, self.pgdata_lsn, + &self.timeline.gate, + self.timeline.cancel.clone(), ctx, ) .await?; From 2f0677be2651df82d31b4ba5db8b13d9dc66274e Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 11 Apr 2025 17:20:20 +0200 Subject: [PATCH 2/2] refactor delta&image writers to perform cleanup on Drop in the background In #10063 we will switch BlobWriter, which underlies delta and image layer writers, to use the owned buffers IO buffered writer. That buffered writer implements double-buffering by virtue of a background task that performs the flushing -- it owns the VirtualFile and both DeltaLayerWriter and ImageLayerWriter are mere clients to it. The implication is that it's no longer true that dropping these client objects guarantees that all IO activity is complete. We must wait for the flush task to exit. In preparation for that new world, this PR moves the cleanup to a short-lived task that is spawned from the Drop impl, and adds appropriate gate guard holdings to hook it into the Timeline lifecycle. We must (theoretically) worry that there will be a retry inbetween Drop completing and the spawned task completing. It could collide on the randomly generated temporary file name. We avoid this by switching to a global monotonic counter. Refs - extracted from https://github.com/neondatabase/neon/pull/10063 - epic https://github.com/neondatabase/neon/issues/9868 --- .../src/tenant/storage_layer/delta_layer.rs | 42 ++++++++++++------- .../src/tenant/storage_layer/image_layer.rs | 40 +++++++++++++----- 2 files changed, 56 insertions(+), 26 deletions(-) diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 4417b8aa51..6a99a62c8a 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -34,6 +34,7 @@ use std::ops::Range; use std::os::unix::fs::FileExt; use std::str::FromStr; use std::sync::Arc; +use std::sync::atomic::AtomicU64; use anyhow::{Context, Result, bail, ensure}; use camino::{Utf8Path, Utf8PathBuf}; @@ -45,8 +46,6 @@ use pageserver_api::keyspace::KeySpace; use pageserver_api::models::ImageCompressionAlgorithm; use pageserver_api::shard::TenantShardId; use pageserver_api::value::Value; -use rand::Rng; -use rand::distributions::Alphanumeric; use serde::{Deserialize, Serialize}; use tokio::sync::OnceCell; use tokio_epoll_uring::IoBuf; @@ -288,19 +287,19 @@ impl DeltaLayer { key_start: Key, lsn_range: &Range, ) -> Utf8PathBuf { - let rand_string: String = rand::thread_rng() - .sample_iter(&Alphanumeric) - .take(8) - .map(char::from) - .collect(); + // Never reuse a filename in the lifetime of a pageserver process so that we need + // not worry about laggard Drop impl's async unlink hitting an already reused filename. + static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1); + let filename_disambiguator = + NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed); conf.timeline_path(tenant_shard_id, timeline_id) .join(format!( - "{}-XXX__{:016X}-{:016X}.{}.{}", + "{}-XXX__{:016X}-{:016X}.{:x}.{}", key_start, u64::from(lsn_range.start), u64::from(lsn_range.end), - rand_string, + filename_disambiguator, TEMP_FILE_SUFFIX, )) } @@ -395,6 +394,8 @@ struct DeltaLayerWriterInner { // Number of key-lsns in the layer. num_keys: usize, + + _gate_guard: utils::sync::gate::GateGuard, } impl DeltaLayerWriterInner { @@ -439,6 +440,7 @@ impl DeltaLayerWriterInner { tree: tree_builder, blob_writer, num_keys: 0, + _gate_guard: gate.enter()?, }) } @@ -728,12 +730,22 @@ impl DeltaLayerWriter { impl Drop for DeltaLayerWriter { fn drop(&mut self) { - if let Some(inner) = self.inner.take() { - // We want to remove the virtual file here, so it's fine to not - // having completely flushed unwritten data. - let vfile = inner.blob_writer.into_inner_no_flush(); + let Some(inner) = self.inner.take() else { + return; + }; + + tokio::spawn(async move { + let DeltaLayerWriterInner { + blob_writer, + _gate_guard, + .. + } = inner; + + let vfile = blob_writer.into_inner_no_flush(); vfile.remove(); - } + + drop(_gate_guard); + }); } } @@ -1609,8 +1621,8 @@ pub(crate) mod test { use bytes::Bytes; use itertools::MinMaxResult; use pageserver_api::value::Value; - use rand::RngCore; use rand::prelude::{SeedableRng, SliceRandom, StdRng}; + use rand::{Rng, RngCore}; use super::*; use crate::DEFAULT_PG_VERSION; diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 3744d615f2..552942a42c 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -32,6 +32,7 @@ use std::ops::Range; use std::os::unix::prelude::FileExt; use std::str::FromStr; use std::sync::Arc; +use std::sync::atomic::AtomicU64; use anyhow::{Context, Result, bail, ensure}; use bytes::Bytes; @@ -43,8 +44,6 @@ use pageserver_api::key::{DBDIR_KEY, KEY_SIZE, Key}; use pageserver_api::keyspace::KeySpace; use pageserver_api::shard::{ShardIdentity, TenantShardId}; use pageserver_api::value::Value; -use rand::Rng; -use rand::distributions::Alphanumeric; use serde::{Deserialize, Serialize}; use tokio::sync::OnceCell; use tokio_stream::StreamExt; @@ -252,14 +251,17 @@ impl ImageLayer { tenant_shard_id: TenantShardId, fname: &ImageLayerName, ) -> Utf8PathBuf { - let rand_string: String = rand::thread_rng() - .sample_iter(&Alphanumeric) - .take(8) - .map(char::from) - .collect(); + // Never reuse a filename in the lifetime of a pageserver process so that we need + // not worry about laggard Drop impl's async unlink hitting an already reused filename. + static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1); + let filename_disambiguator = + NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed); conf.timeline_path(&tenant_shard_id, &timeline_id) - .join(format!("{fname}.{rand_string}.{TEMP_FILE_SUFFIX}")) + .join(format!( + "{fname}.{:x}.{TEMP_FILE_SUFFIX}", + filename_disambiguator + )) } /// @@ -743,6 +745,8 @@ struct ImageLayerWriterInner { #[cfg(feature = "testing")] last_written_key: Key, + + _gate_guard: utils::sync::gate::GateGuard, } impl ImageLayerWriterInner { @@ -805,6 +809,7 @@ impl ImageLayerWriterInner { num_keys: 0, #[cfg(feature = "testing")] last_written_key: Key::MIN, + _gate_guard: gate.enter()?, }; Ok(writer) @@ -1066,9 +1071,22 @@ impl ImageLayerWriter { impl Drop for ImageLayerWriter { fn drop(&mut self) { - if let Some(inner) = self.inner.take() { - inner.blob_writer.into_inner().remove(); - } + let Some(inner) = self.inner.take() else { + return; + }; + + tokio::spawn(async move { + let ImageLayerWriterInner { + blob_writer, + _gate_guard, + .. + } = inner; + + let vfile = blob_writer.into_inner(); + vfile.remove(); + + drop(_gate_guard); + }); } }