From daebe50e19bc67c14d019073afd5d6cd433e8246 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Mon, 14 Apr 2025 13:51:01 +0200 Subject: [PATCH] refactor: plumb gate and cancellation down to to blob_io::BlobWriter (#11543) In #10063 we will switch BlobWriter to use the owned buffers IO buffered writer, which implements double-buffering by virtue of a background task that performs the flushing. That task's lifecylce must be contained within the Timeline lifecycle, so, it must hold the timeline gate open and respect Timeline::cancel. This PR does the noisy plumbing to reduce the #10063 diff. Refs - extracted from https://github.com/neondatabase/neon/pull/10063 - epic https://github.com/neondatabase/neon/issues/9868 --- pageserver/benches/bench_ingest.rs | 2 +- pageserver/src/tenant/blob_io.rs | 13 +++++- .../storage_layer/batch_split_writer.rs | 44 +++++++++++++++++-- .../src/tenant/storage_layer/delta_layer.rs | 17 ++++++- .../src/tenant/storage_layer/image_layer.rs | 28 ++++++++++-- .../tenant/storage_layer/inmemory_layer.rs | 4 ++ pageserver/src/tenant/timeline.rs | 14 +++++- pageserver/src/tenant/timeline/compaction.rs | 20 ++++++++- .../src/tenant/timeline/detach_ancestor.rs | 4 ++ .../src/tenant/timeline/import_pgdata/flow.rs | 2 + 10 files changed, 134 insertions(+), 14 deletions(-) diff --git a/pageserver/benches/bench_ingest.rs b/pageserver/benches/bench_ingest.rs index 000938b189..3108b5351f 100644 --- a/pageserver/benches/bench_ingest.rs +++ b/pageserver/benches/bench_ingest.rs @@ -126,7 +126,7 @@ async fn ingest( max_concurrency: NonZeroUsize::new(1).unwrap(), }); let (_desc, path) = layer - .write_to_disk(&ctx, None, l0_flush_state.inner()) + .write_to_disk(&ctx, None, l0_flush_state.inner(), &gate, cancel.clone()) .await? .unwrap(); tokio::fs::remove_file(path).await?; diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index b0b2a16c2f..abeaa166a4 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -22,6 +22,7 @@ use bytes::{BufMut, BytesMut}; use pageserver_api::models::ImageCompressionAlgorithm; use tokio::io::AsyncWriteExt; use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice}; +use tokio_util::sync::CancellationToken; use tracing::warn; use crate::context::RequestContext; @@ -169,7 +170,13 @@ pub struct BlobWriter { } impl BlobWriter { - pub fn new(inner: VirtualFile, start_offset: u64) -> Self { + pub fn new( + inner: VirtualFile, + start_offset: u64, + _gate: &utils::sync::gate::Gate, + _cancel: CancellationToken, + _ctx: &RequestContext, + ) -> Self { Self { inner, offset: start_offset, @@ -432,12 +439,14 @@ pub(crate) mod tests { ) -> Result<(Utf8TempDir, Utf8PathBuf, Vec), Error> { let temp_dir = camino_tempfile::tempdir()?; let pathbuf = temp_dir.path().join("file"); + let gate = utils::sync::gate::Gate::default(); + let cancel = CancellationToken::new(); // Write part (in block to drop the file) let mut offsets = Vec::new(); { let file = VirtualFile::create(pathbuf.as_path(), ctx).await?; - let mut wtr = BlobWriter::::new(file, 0); + let mut wtr = BlobWriter::::new(file, 0, &gate, cancel.clone(), ctx); for blob in blobs.iter() { let (_, res) = if compression { let res = wtr diff --git a/pageserver/src/tenant/storage_layer/batch_split_writer.rs b/pageserver/src/tenant/storage_layer/batch_split_writer.rs index 29ada15c36..39cd02d101 100644 --- a/pageserver/src/tenant/storage_layer/batch_split_writer.rs +++ b/pageserver/src/tenant/storage_layer/batch_split_writer.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use bytes::Bytes; use pageserver_api::key::{KEY_SIZE, Key}; use pageserver_api::value::Value; +use tokio_util::sync::CancellationToken; use utils::id::TimelineId; use utils::lsn::Lsn; use utils::shard::TenantShardId; @@ -179,7 +180,7 @@ impl BatchLayerWriter { /// An image writer that takes images and produces multiple image layers. #[must_use] -pub struct SplitImageLayerWriter { +pub struct SplitImageLayerWriter<'a> { inner: ImageLayerWriter, target_layer_size: u64, lsn: Lsn, @@ -188,9 +189,12 @@ pub struct SplitImageLayerWriter { tenant_shard_id: TenantShardId, batches: BatchLayerWriter, start_key: Key, + gate: &'a utils::sync::gate::Gate, + cancel: CancellationToken, } -impl SplitImageLayerWriter { +impl<'a> SplitImageLayerWriter<'a> { + #[allow(clippy::too_many_arguments)] pub async fn new( conf: &'static PageServerConf, timeline_id: TimelineId, @@ -198,6 +202,8 @@ impl SplitImageLayerWriter { start_key: Key, lsn: Lsn, target_layer_size: u64, + gate: &'a utils::sync::gate::Gate, + cancel: CancellationToken, ctx: &RequestContext, ) -> anyhow::Result { Ok(Self { @@ -208,6 +214,8 @@ impl SplitImageLayerWriter { tenant_shard_id, &(start_key..Key::MAX), lsn, + gate, + cancel.clone(), ctx, ) .await?, @@ -217,6 +225,8 @@ impl SplitImageLayerWriter { batches: BatchLayerWriter::new(conf).await?, lsn, start_key, + gate, + cancel, }) } @@ -239,6 +249,8 @@ impl SplitImageLayerWriter { self.tenant_shard_id, &(key..Key::MAX), self.lsn, + self.gate, + self.cancel.clone(), ctx, ) .await?; @@ -291,7 +303,7 @@ impl SplitImageLayerWriter { /// into a single file. This behavior might change in the future. For reference, the legacy compaction algorithm /// will split them into multiple files based on size. #[must_use] -pub struct SplitDeltaLayerWriter { +pub struct SplitDeltaLayerWriter<'a> { inner: Option<(Key, DeltaLayerWriter)>, target_layer_size: u64, conf: &'static PageServerConf, @@ -300,15 +312,19 @@ pub struct SplitDeltaLayerWriter { lsn_range: Range, last_key_written: Key, batches: BatchLayerWriter, + gate: &'a utils::sync::gate::Gate, + cancel: CancellationToken, } -impl SplitDeltaLayerWriter { +impl<'a> SplitDeltaLayerWriter<'a> { pub async fn new( conf: &'static PageServerConf, timeline_id: TimelineId, tenant_shard_id: TenantShardId, lsn_range: Range, target_layer_size: u64, + gate: &'a utils::sync::gate::Gate, + cancel: CancellationToken, ) -> anyhow::Result { Ok(Self { target_layer_size, @@ -319,6 +335,8 @@ impl SplitDeltaLayerWriter { lsn_range, last_key_written: Key::MIN, batches: BatchLayerWriter::new(conf).await?, + gate, + cancel, }) } @@ -344,6 +362,8 @@ impl SplitDeltaLayerWriter { self.tenant_shard_id, key, self.lsn_range.clone(), + self.gate, + self.cancel.clone(), ctx, ) .await?, @@ -362,6 +382,8 @@ impl SplitDeltaLayerWriter { self.tenant_shard_id, key, self.lsn_range.clone(), + self.gate, + self.cancel.clone(), ctx, ) .await?; @@ -469,6 +491,8 @@ mod tests { get_key(0), Lsn(0x18), 4 * 1024 * 1024, + &tline.gate, + tline.cancel.clone(), &ctx, ) .await @@ -480,6 +504,8 @@ mod tests { tenant.tenant_shard_id, Lsn(0x18)..Lsn(0x20), 4 * 1024 * 1024, + &tline.gate, + tline.cancel.clone(), ) .await .unwrap(); @@ -546,6 +572,8 @@ mod tests { get_key(0), Lsn(0x18), 4 * 1024 * 1024, + &tline.gate, + tline.cancel.clone(), &ctx, ) .await @@ -556,6 +584,8 @@ mod tests { tenant.tenant_shard_id, Lsn(0x18)..Lsn(0x20), 4 * 1024 * 1024, + &tline.gate, + tline.cancel.clone(), ) .await .unwrap(); @@ -643,6 +673,8 @@ mod tests { get_key(0), Lsn(0x18), 4 * 1024, + &tline.gate, + tline.cancel.clone(), &ctx, ) .await @@ -654,6 +686,8 @@ mod tests { tenant.tenant_shard_id, Lsn(0x18)..Lsn(0x20), 4 * 1024, + &tline.gate, + tline.cancel.clone(), ) .await .unwrap(); @@ -730,6 +764,8 @@ mod tests { tenant.tenant_shard_id, Lsn(0x10)..Lsn(N as u64 * 16 + 0x10), 4 * 1024 * 1024, + &tline.gate, + tline.cancel.clone(), ) .await .unwrap(); diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 05b0bc1a5c..4417b8aa51 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -50,6 +50,7 @@ use rand::distributions::Alphanumeric; use serde::{Deserialize, Serialize}; use tokio::sync::OnceCell; use tokio_epoll_uring::IoBuf; +use tokio_util::sync::CancellationToken; use tracing::*; use utils::bin_ser::BeSer; use utils::id::{TenantId, TimelineId}; @@ -400,12 +401,15 @@ impl DeltaLayerWriterInner { /// /// Start building a new delta layer. /// + #[allow(clippy::too_many_arguments)] async fn new( conf: &'static PageServerConf, timeline_id: TimelineId, tenant_shard_id: TenantShardId, key_start: Key, lsn_range: Range, + gate: &utils::sync::gate::Gate, + cancel: CancellationToken, ctx: &RequestContext, ) -> anyhow::Result { // Create the file initially with a temporary filename. We don't know @@ -420,7 +424,7 @@ impl DeltaLayerWriterInner { let mut file = VirtualFile::create(&path, ctx).await?; // make room for the header block file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?; - let blob_writer = BlobWriter::new(file, PAGE_SZ as u64); + let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, cancel, ctx); // Initialize the b-tree index builder let block_buf = BlockBuf::new(); @@ -628,12 +632,15 @@ impl DeltaLayerWriter { /// /// Start building a new delta layer. /// + #[allow(clippy::too_many_arguments)] pub async fn new( conf: &'static PageServerConf, timeline_id: TimelineId, tenant_shard_id: TenantShardId, key_start: Key, lsn_range: Range, + gate: &utils::sync::gate::Gate, + cancel: CancellationToken, ctx: &RequestContext, ) -> anyhow::Result { Ok(Self { @@ -644,6 +651,8 @@ impl DeltaLayerWriter { tenant_shard_id, key_start, lsn_range, + gate, + cancel, ctx, ) .await?, @@ -1885,6 +1894,8 @@ pub(crate) mod test { harness.tenant_shard_id, entries_meta.key_range.start, entries_meta.lsn_range.clone(), + &timeline.gate, + timeline.cancel.clone(), &ctx, ) .await?; @@ -2079,6 +2090,8 @@ pub(crate) mod test { tenant.tenant_shard_id, Key::MIN, Lsn(0x11)..truncate_at, + &branch.gate, + branch.cancel.clone(), ctx, ) .await @@ -2213,6 +2226,8 @@ pub(crate) mod test { tenant.tenant_shard_id, *key_start, (*lsn_min)..lsn_end, + &tline.gate, + tline.cancel.clone(), ctx, ) .await?; diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 72992e5031..3744d615f2 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -48,6 +48,7 @@ use rand::distributions::Alphanumeric; use serde::{Deserialize, Serialize}; use tokio::sync::OnceCell; use tokio_stream::StreamExt; +use tokio_util::sync::CancellationToken; use tracing::*; use utils::bin_ser::BeSer; use utils::id::{TenantId, TimelineId}; @@ -748,12 +749,15 @@ impl ImageLayerWriterInner { /// /// Start building a new image layer. /// + #[allow(clippy::too_many_arguments)] async fn new( conf: &'static PageServerConf, timeline_id: TimelineId, tenant_shard_id: TenantShardId, key_range: &Range, lsn: Lsn, + gate: &utils::sync::gate::Gate, + cancel: CancellationToken, ctx: &RequestContext, ) -> anyhow::Result { // Create the file initially with a temporary filename. @@ -780,7 +784,7 @@ impl ImageLayerWriterInner { }; // make room for the header block file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?; - let blob_writer = BlobWriter::new(file, PAGE_SZ as u64); + let blob_writer = BlobWriter::new(file, PAGE_SZ as u64, gate, cancel, ctx); // Initialize the b-tree index builder let block_buf = BlockBuf::new(); @@ -988,18 +992,30 @@ impl ImageLayerWriter { /// /// Start building a new image layer. /// + #[allow(clippy::too_many_arguments)] pub async fn new( conf: &'static PageServerConf, timeline_id: TimelineId, tenant_shard_id: TenantShardId, key_range: &Range, lsn: Lsn, + gate: &utils::sync::gate::Gate, + cancel: CancellationToken, ctx: &RequestContext, ) -> anyhow::Result { Ok(Self { inner: Some( - ImageLayerWriterInner::new(conf, timeline_id, tenant_shard_id, key_range, lsn, ctx) - .await?, + ImageLayerWriterInner::new( + conf, + timeline_id, + tenant_shard_id, + key_range, + lsn, + gate, + cancel, + ctx, + ) + .await?, ), }) } @@ -1203,6 +1219,8 @@ mod test { harness.tenant_shard_id, &range, lsn, + &timeline.gate, + timeline.cancel.clone(), &ctx, ) .await @@ -1268,6 +1286,8 @@ mod test { harness.tenant_shard_id, &range, lsn, + &timeline.gate, + timeline.cancel.clone(), &ctx, ) .await @@ -1346,6 +1366,8 @@ mod test { tenant.tenant_shard_id, &key_range, lsn, + &tline.gate, + tline.cancel.clone(), ctx, ) .await?; diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 388ed3201c..5d558e66cc 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -719,6 +719,8 @@ impl InMemoryLayer { ctx: &RequestContext, key_range: Option>, l0_flush_global_state: &l0_flush::Inner, + gate: &utils::sync::gate::Gate, + cancel: CancellationToken, ) -> Result> { // Grab the lock in read-mode. We hold it over the I/O, but because this // layer is not writeable anymore, no one should be trying to acquire the @@ -759,6 +761,8 @@ impl InMemoryLayer { self.tenant_shard_id, Key::MIN, self.start_lsn..end_lsn, + gate, + cancel, ctx, ) .await?; diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 7d4eb0cd82..204bdb5eee 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -4986,7 +4986,13 @@ impl Timeline { let ctx = ctx.attached_child(); let work = async move { let Some((desc, path)) = frozen_layer - .write_to_disk(&ctx, key_range, self_clone.l0_flush_global_state.inner()) + .write_to_disk( + &ctx, + key_range, + self_clone.l0_flush_global_state.inner(), + &self_clone.gate, + self_clone.cancel.clone(), + ) .await? else { return Ok(None); @@ -5526,6 +5532,8 @@ impl Timeline { self.tenant_shard_id, &img_range, lsn, + &self.gate, + self.cancel.clone(), ctx, ) .await?; @@ -6890,6 +6898,8 @@ impl Timeline { self.tenant_shard_id, &(min_key..end_key), lsn, + &self.gate, + self.cancel.clone(), ctx, ) .await?; @@ -6951,6 +6961,8 @@ impl Timeline { self.tenant_shard_id, deltas.key_range.start, deltas.lsn_range, + &self.gate, + self.cancel.clone(), ctx, ) .await?; diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index e3aa5045bb..91cc8ca10c 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -749,8 +749,8 @@ impl KeyHistoryRetention { async fn pipe_to( self, key: Key, - delta_writer: &mut SplitDeltaLayerWriter, - mut image_writer: Option<&mut SplitImageLayerWriter>, + delta_writer: &mut SplitDeltaLayerWriter<'_>, + mut image_writer: Option<&mut SplitImageLayerWriter<'_>>, stat: &mut CompactionStatistics, ctx: &RequestContext, ) -> anyhow::Result<()> { @@ -1394,6 +1394,8 @@ impl Timeline { self.tenant_shard_id, &layer.layer_desc().key_range, layer.layer_desc().image_layer_lsn(), + &self.gate, + self.cancel.clone(), ctx, ) .await @@ -2033,6 +2035,8 @@ impl Timeline { debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end); lsn_range.clone() }, + &self.gate, + self.cancel.clone(), ctx, ) .await @@ -3232,6 +3236,8 @@ impl Timeline { job_desc.compaction_key_range.start, lowest_retain_lsn, self.get_compaction_target_size(), + &self.gate, + self.cancel.clone(), ctx, ) .await @@ -3248,6 +3254,8 @@ impl Timeline { self.tenant_shard_id, lowest_retain_lsn..end_lsn, self.get_compaction_target_size(), + &self.gate, + self.cancel.clone(), ) .await .context("failed to create delta layer writer") @@ -3344,6 +3352,8 @@ impl Timeline { self.tenant_shard_id, desc.key_range.start, desc.lsn_range.clone(), + &self.gate, + self.cancel.clone(), ctx, ) .await @@ -3361,6 +3371,8 @@ impl Timeline { self.tenant_shard_id, job_desc.compaction_key_range.end, desc.lsn_range.clone(), + &self.gate, + self.cancel.clone(), ctx, ) .await @@ -3932,6 +3944,8 @@ impl CompactionJobExecutor for TimelineAdaptor { self.timeline.tenant_shard_id, key_range.start, lsn_range.clone(), + &self.timeline.gate, + self.timeline.cancel.clone(), ctx, ) .await?; @@ -4007,6 +4021,8 @@ impl TimelineAdaptor { self.timeline.tenant_shard_id, key_range, lsn, + &self.timeline.gate, + self.timeline.cancel.clone(), ctx, ) .await?; diff --git a/pageserver/src/tenant/timeline/detach_ancestor.rs b/pageserver/src/tenant/timeline/detach_ancestor.rs index 7c61f32d1e..a841cc55f0 100644 --- a/pageserver/src/tenant/timeline/detach_ancestor.rs +++ b/pageserver/src/tenant/timeline/detach_ancestor.rs @@ -228,6 +228,8 @@ async fn generate_tombstone_image_layer( detached.tenant_shard_id, &key_range, image_lsn, + &detached.gate, + detached.cancel.clone(), ctx, ) .await @@ -776,6 +778,8 @@ async fn copy_lsn_prefix( target_timeline.tenant_shard_id, layer.layer_desc().key_range.start, layer.layer_desc().lsn_range.start..end_lsn, + &target_timeline.gate, + target_timeline.cancel.clone(), ctx, ) .await diff --git a/pageserver/src/tenant/timeline/import_pgdata/flow.rs b/pageserver/src/tenant/timeline/import_pgdata/flow.rs index 3ef82b3658..c6d2944769 100644 --- a/pageserver/src/tenant/timeline/import_pgdata/flow.rs +++ b/pageserver/src/tenant/timeline/import_pgdata/flow.rs @@ -738,6 +738,8 @@ impl ChunkProcessingJob { self.timeline.tenant_shard_id, &self.range, self.pgdata_lsn, + &self.timeline.gate, + self.timeline.cancel.clone(), ctx, ) .await?;