diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 4417b8aa51..6a99a62c8a 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -34,6 +34,7 @@ use std::ops::Range; use std::os::unix::fs::FileExt; use std::str::FromStr; use std::sync::Arc; +use std::sync::atomic::AtomicU64; use anyhow::{Context, Result, bail, ensure}; use camino::{Utf8Path, Utf8PathBuf}; @@ -45,8 +46,6 @@ use pageserver_api::keyspace::KeySpace; use pageserver_api::models::ImageCompressionAlgorithm; use pageserver_api::shard::TenantShardId; use pageserver_api::value::Value; -use rand::Rng; -use rand::distributions::Alphanumeric; use serde::{Deserialize, Serialize}; use tokio::sync::OnceCell; use tokio_epoll_uring::IoBuf; @@ -288,19 +287,19 @@ impl DeltaLayer { key_start: Key, lsn_range: &Range, ) -> Utf8PathBuf { - let rand_string: String = rand::thread_rng() - .sample_iter(&Alphanumeric) - .take(8) - .map(char::from) - .collect(); + // Never reuse a filename in the lifetime of a pageserver process so that we need + // not worry about laggard Drop impl's async unlink hitting an already reused filename. + static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1); + let filename_disambiguator = + NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed); conf.timeline_path(tenant_shard_id, timeline_id) .join(format!( - "{}-XXX__{:016X}-{:016X}.{}.{}", + "{}-XXX__{:016X}-{:016X}.{:x}.{}", key_start, u64::from(lsn_range.start), u64::from(lsn_range.end), - rand_string, + filename_disambiguator, TEMP_FILE_SUFFIX, )) } @@ -395,6 +394,8 @@ struct DeltaLayerWriterInner { // Number of key-lsns in the layer. num_keys: usize, + + _gate_guard: utils::sync::gate::GateGuard, } impl DeltaLayerWriterInner { @@ -439,6 +440,7 @@ impl DeltaLayerWriterInner { tree: tree_builder, blob_writer, num_keys: 0, + _gate_guard: gate.enter()?, }) } @@ -728,12 +730,22 @@ impl DeltaLayerWriter { impl Drop for DeltaLayerWriter { fn drop(&mut self) { - if let Some(inner) = self.inner.take() { - // We want to remove the virtual file here, so it's fine to not - // having completely flushed unwritten data. - let vfile = inner.blob_writer.into_inner_no_flush(); + let Some(inner) = self.inner.take() else { + return; + }; + + tokio::spawn(async move { + let DeltaLayerWriterInner { + blob_writer, + _gate_guard, + .. + } = inner; + + let vfile = blob_writer.into_inner_no_flush(); vfile.remove(); - } + + drop(_gate_guard); + }); } } @@ -1609,8 +1621,8 @@ pub(crate) mod test { use bytes::Bytes; use itertools::MinMaxResult; use pageserver_api::value::Value; - use rand::RngCore; use rand::prelude::{SeedableRng, SliceRandom, StdRng}; + use rand::{Rng, RngCore}; use super::*; use crate::DEFAULT_PG_VERSION; diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 3744d615f2..552942a42c 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -32,6 +32,7 @@ use std::ops::Range; use std::os::unix::prelude::FileExt; use std::str::FromStr; use std::sync::Arc; +use std::sync::atomic::AtomicU64; use anyhow::{Context, Result, bail, ensure}; use bytes::Bytes; @@ -43,8 +44,6 @@ use pageserver_api::key::{DBDIR_KEY, KEY_SIZE, Key}; use pageserver_api::keyspace::KeySpace; use pageserver_api::shard::{ShardIdentity, TenantShardId}; use pageserver_api::value::Value; -use rand::Rng; -use rand::distributions::Alphanumeric; use serde::{Deserialize, Serialize}; use tokio::sync::OnceCell; use tokio_stream::StreamExt; @@ -252,14 +251,17 @@ impl ImageLayer { tenant_shard_id: TenantShardId, fname: &ImageLayerName, ) -> Utf8PathBuf { - let rand_string: String = rand::thread_rng() - .sample_iter(&Alphanumeric) - .take(8) - .map(char::from) - .collect(); + // Never reuse a filename in the lifetime of a pageserver process so that we need + // not worry about laggard Drop impl's async unlink hitting an already reused filename. + static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1); + let filename_disambiguator = + NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed); conf.timeline_path(&tenant_shard_id, &timeline_id) - .join(format!("{fname}.{rand_string}.{TEMP_FILE_SUFFIX}")) + .join(format!( + "{fname}.{:x}.{TEMP_FILE_SUFFIX}", + filename_disambiguator + )) } /// @@ -743,6 +745,8 @@ struct ImageLayerWriterInner { #[cfg(feature = "testing")] last_written_key: Key, + + _gate_guard: utils::sync::gate::GateGuard, } impl ImageLayerWriterInner { @@ -805,6 +809,7 @@ impl ImageLayerWriterInner { num_keys: 0, #[cfg(feature = "testing")] last_written_key: Key::MIN, + _gate_guard: gate.enter()?, }; Ok(writer) @@ -1066,9 +1071,22 @@ impl ImageLayerWriter { impl Drop for ImageLayerWriter { fn drop(&mut self) { - if let Some(inner) = self.inner.take() { - inner.blob_writer.into_inner().remove(); - } + let Some(inner) = self.inner.take() else { + return; + }; + + tokio::spawn(async move { + let ImageLayerWriterInner { + blob_writer, + _gate_guard, + .. + } = inner; + + let vfile = blob_writer.into_inner(); + vfile.remove(); + + drop(_gate_guard); + }); } }