From a929e7a8443f48c49ab690ec609ec98414580003 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 11 Apr 2025 16:35:23 +0200 Subject: [PATCH] gate & cancel propagation: make it less invasive - store reference to gate - store CancellationToken clone --- .../storage_layer/batch_split_writer.rs | 68 +++++++------------ pageserver/src/tenant/timeline/compaction.rs | 18 +++-- 2 files changed, 33 insertions(+), 53 deletions(-) diff --git a/pageserver/src/tenant/storage_layer/batch_split_writer.rs b/pageserver/src/tenant/storage_layer/batch_split_writer.rs index 7d84a9a80d..39cd02d101 100644 --- a/pageserver/src/tenant/storage_layer/batch_split_writer.rs +++ b/pageserver/src/tenant/storage_layer/batch_split_writer.rs @@ -180,7 +180,7 @@ impl BatchLayerWriter { /// An image writer that takes images and produces multiple image layers. #[must_use] -pub struct SplitImageLayerWriter { +pub struct SplitImageLayerWriter<'a> { inner: ImageLayerWriter, target_layer_size: u64, lsn: Lsn, @@ -189,10 +189,11 @@ pub struct SplitImageLayerWriter { tenant_shard_id: TenantShardId, batches: BatchLayerWriter, start_key: Key, + gate: &'a utils::sync::gate::Gate, cancel: CancellationToken, } -impl SplitImageLayerWriter { +impl<'a> SplitImageLayerWriter<'a> { #[allow(clippy::too_many_arguments)] pub async fn new( conf: &'static PageServerConf, @@ -201,7 +202,7 @@ impl SplitImageLayerWriter { start_key: Key, lsn: Lsn, target_layer_size: u64, - gate: &utils::sync::gate::Gate, + gate: &'a utils::sync::gate::Gate, cancel: CancellationToken, ctx: &RequestContext, ) -> anyhow::Result { @@ -224,6 +225,7 @@ impl SplitImageLayerWriter { batches: BatchLayerWriter::new(conf).await?, lsn, start_key, + gate, cancel, }) } @@ -232,7 +234,6 @@ impl SplitImageLayerWriter { &mut self, key: Key, img: Bytes, - gate: &utils::sync::gate::Gate, ctx: &RequestContext, ) -> anyhow::Result<()> { // The current estimation is an upper bound of the space that the key/image could take @@ -248,7 +249,7 @@ impl SplitImageLayerWriter { self.tenant_shard_id, &(key..Key::MAX), self.lsn, - gate, + self.gate, self.cancel.clone(), ctx, ) @@ -302,7 +303,7 @@ impl SplitImageLayerWriter { /// into a single file. This behavior might change in the future. For reference, the legacy compaction algorithm /// will split them into multiple files based on size. #[must_use] -pub struct SplitDeltaLayerWriter { +pub struct SplitDeltaLayerWriter<'a> { inner: Option<(Key, DeltaLayerWriter)>, target_layer_size: u64, conf: &'static PageServerConf, @@ -311,16 +312,18 @@ pub struct SplitDeltaLayerWriter { lsn_range: Range, last_key_written: Key, batches: BatchLayerWriter, + gate: &'a utils::sync::gate::Gate, cancel: CancellationToken, } -impl SplitDeltaLayerWriter { +impl<'a> SplitDeltaLayerWriter<'a> { pub async fn new( conf: &'static PageServerConf, timeline_id: TimelineId, tenant_shard_id: TenantShardId, lsn_range: Range, target_layer_size: u64, + gate: &'a utils::sync::gate::Gate, cancel: CancellationToken, ) -> anyhow::Result { Ok(Self { @@ -332,6 +335,7 @@ impl SplitDeltaLayerWriter { lsn_range, last_key_written: Key::MIN, batches: BatchLayerWriter::new(conf).await?, + gate, cancel, }) } @@ -341,7 +345,6 @@ impl SplitDeltaLayerWriter { key: Key, lsn: Lsn, val: Value, - gate: &utils::sync::gate::Gate, ctx: &RequestContext, ) -> anyhow::Result<()> { // The current estimation is key size plus LSN size plus value size estimation. This is not an accurate @@ -359,7 +362,7 @@ impl SplitDeltaLayerWriter { self.tenant_shard_id, key, self.lsn_range.clone(), - gate, + self.gate, self.cancel.clone(), ctx, ) @@ -379,7 +382,7 @@ impl SplitDeltaLayerWriter { self.tenant_shard_id, key, self.lsn_range.clone(), - gate, + self.gate, self.cancel.clone(), ctx, ) @@ -501,13 +504,14 @@ mod tests { tenant.tenant_shard_id, Lsn(0x18)..Lsn(0x20), 4 * 1024 * 1024, + &tline.gate, tline.cancel.clone(), ) .await .unwrap(); image_writer - .put_image(get_key(0), get_img(0), &tline.gate, &ctx) + .put_image(get_key(0), get_img(0), &ctx) .await .unwrap(); let layers = image_writer @@ -517,13 +521,7 @@ mod tests { assert_eq!(layers.len(), 1); delta_writer - .put_value( - get_key(0), - Lsn(0x18), - Value::Image(get_img(0)), - &tline.gate, - &ctx, - ) + .put_value(get_key(0), Lsn(0x18), Value::Image(get_img(0)), &ctx) .await .unwrap(); let layers = delta_writer.finish(&tline, &ctx).await.unwrap(); @@ -586,6 +584,7 @@ mod tests { tenant.tenant_shard_id, Lsn(0x18)..Lsn(0x20), 4 * 1024 * 1024, + &tline.gate, tline.cancel.clone(), ) .await @@ -594,17 +593,11 @@ mod tests { for i in 0..N { let i = i as u32; image_writer - .put_image(get_key(i), get_large_img(), &tline.gate, &ctx) + .put_image(get_key(i), get_large_img(), &ctx) .await .unwrap(); delta_writer - .put_value( - get_key(i), - Lsn(0x20), - Value::Image(get_large_img()), - &tline.gate, - &ctx, - ) + .put_value(get_key(i), Lsn(0x20), Value::Image(get_large_img()), &ctx) .await .unwrap(); } @@ -693,17 +686,18 @@ mod tests { tenant.tenant_shard_id, Lsn(0x18)..Lsn(0x20), 4 * 1024, + &tline.gate, tline.cancel.clone(), ) .await .unwrap(); image_writer - .put_image(get_key(0), get_img(0), &tline.gate, &ctx) + .put_image(get_key(0), get_img(0), &ctx) .await .unwrap(); image_writer - .put_image(get_key(1), get_large_img(), &tline.gate, &ctx) + .put_image(get_key(1), get_large_img(), &ctx) .await .unwrap(); let layers = image_writer @@ -713,23 +707,11 @@ mod tests { assert_eq!(layers.len(), 2); delta_writer - .put_value( - get_key(0), - Lsn(0x18), - Value::Image(get_img(0)), - &tline.gate, - &ctx, - ) + .put_value(get_key(0), Lsn(0x18), Value::Image(get_img(0)), &ctx) .await .unwrap(); delta_writer - .put_value( - get_key(1), - Lsn(0x1A), - Value::Image(get_large_img()), - &tline.gate, - &ctx, - ) + .put_value(get_key(1), Lsn(0x1A), Value::Image(get_large_img()), &ctx) .await .unwrap(); let layers = delta_writer.finish(&tline, &ctx).await.unwrap(); @@ -782,6 +764,7 @@ mod tests { tenant.tenant_shard_id, Lsn(0x10)..Lsn(N as u64 * 16 + 0x10), 4 * 1024 * 1024, + &tline.gate, tline.cancel.clone(), ) .await @@ -794,7 +777,6 @@ mod tests { get_key(0), Lsn(i as u64 * 16 + 0x10), Value::Image(get_large_img()), - &tline.gate, &ctx, ) .await diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 3b38712b0b..852c8643f7 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -747,10 +747,9 @@ impl KeyHistoryRetention { async fn pipe_to( self, key: Key, - delta_writer: &mut SplitDeltaLayerWriter, - mut image_writer: Option<&mut SplitImageLayerWriter>, + delta_writer: &mut SplitDeltaLayerWriter<'_>, + mut image_writer: Option<&mut SplitImageLayerWriter<'_>>, stat: &mut CompactionStatistics, - gate: &utils::sync::gate::Gate, ctx: &RequestContext, ) -> anyhow::Result<()> { let mut first_batch = true; @@ -762,30 +761,30 @@ impl KeyHistoryRetention { }; stat.produce_image_key(img); if let Some(image_writer) = image_writer.as_mut() { - image_writer.put_image(key, img.clone(), gate, ctx).await?; + image_writer.put_image(key, img.clone(), ctx).await?; } else { delta_writer - .put_value(key, cutoff_lsn, Value::Image(img.clone()), gate, ctx) + .put_value(key, cutoff_lsn, Value::Image(img.clone()), ctx) .await?; } } else { for (lsn, val) in logs { stat.produce_key(&val); - delta_writer.put_value(key, lsn, val, gate, ctx).await?; + delta_writer.put_value(key, lsn, val, ctx).await?; } } first_batch = false; } else { for (lsn, val) in logs { stat.produce_key(&val); - delta_writer.put_value(key, lsn, val, gate, ctx).await?; + delta_writer.put_value(key, lsn, val, ctx).await?; } } } let KeyLogAtLsn(above_horizon_logs) = self.above_horizon; for (lsn, val) in above_horizon_logs { stat.produce_key(&val); - delta_writer.put_value(key, lsn, val, gate, ctx).await?; + delta_writer.put_value(key, lsn, val, ctx).await?; } Ok(()) } @@ -3106,6 +3105,7 @@ impl Timeline { self.tenant_shard_id, lowest_retain_lsn..end_lsn, self.get_compaction_target_size(), + &self.gate, self.cancel.clone(), ) .await @@ -3275,7 +3275,6 @@ impl Timeline { &mut delta_layer_writer, image_layer_writer.as_mut(), &mut stat, - &self.gate, ctx, ) .await @@ -3316,7 +3315,6 @@ impl Timeline { &mut delta_layer_writer, image_layer_writer.as_mut(), &mut stat, - &self.gate, ctx, ) .await