gate & cancel propagation: make it less invasive

- store reference to gate
- store CancellationToken clone
This commit is contained in:
Christian Schwarz
2025-04-11 16:35:23 +02:00
parent 3f417d4ac8
commit a929e7a844
2 changed files with 33 additions and 53 deletions

View File

@@ -180,7 +180,7 @@ impl BatchLayerWriter {
/// An image writer that takes images and produces multiple image layers.
#[must_use]
pub struct SplitImageLayerWriter {
pub struct SplitImageLayerWriter<'a> {
inner: ImageLayerWriter,
target_layer_size: u64,
lsn: Lsn,
@@ -189,10 +189,11 @@ pub struct SplitImageLayerWriter {
tenant_shard_id: TenantShardId,
batches: BatchLayerWriter,
start_key: Key,
gate: &'a utils::sync::gate::Gate,
cancel: CancellationToken,
}
impl SplitImageLayerWriter {
impl<'a> SplitImageLayerWriter<'a> {
#[allow(clippy::too_many_arguments)]
pub async fn new(
conf: &'static PageServerConf,
@@ -201,7 +202,7 @@ impl SplitImageLayerWriter {
start_key: Key,
lsn: Lsn,
target_layer_size: u64,
gate: &utils::sync::gate::Gate,
gate: &'a utils::sync::gate::Gate,
cancel: CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
@@ -224,6 +225,7 @@ impl SplitImageLayerWriter {
batches: BatchLayerWriter::new(conf).await?,
lsn,
start_key,
gate,
cancel,
})
}
@@ -232,7 +234,6 @@ impl SplitImageLayerWriter {
&mut self,
key: Key,
img: Bytes,
gate: &utils::sync::gate::Gate,
ctx: &RequestContext,
) -> anyhow::Result<()> {
// The current estimation is an upper bound of the space that the key/image could take
@@ -248,7 +249,7 @@ impl SplitImageLayerWriter {
self.tenant_shard_id,
&(key..Key::MAX),
self.lsn,
gate,
self.gate,
self.cancel.clone(),
ctx,
)
@@ -302,7 +303,7 @@ impl SplitImageLayerWriter {
/// into a single file. This behavior might change in the future. For reference, the legacy compaction algorithm
/// will split them into multiple files based on size.
#[must_use]
pub struct SplitDeltaLayerWriter {
pub struct SplitDeltaLayerWriter<'a> {
inner: Option<(Key, DeltaLayerWriter)>,
target_layer_size: u64,
conf: &'static PageServerConf,
@@ -311,16 +312,18 @@ pub struct SplitDeltaLayerWriter {
lsn_range: Range<Lsn>,
last_key_written: Key,
batches: BatchLayerWriter,
gate: &'a utils::sync::gate::Gate,
cancel: CancellationToken,
}
impl SplitDeltaLayerWriter {
impl<'a> SplitDeltaLayerWriter<'a> {
pub async fn new(
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
lsn_range: Range<Lsn>,
target_layer_size: u64,
gate: &'a utils::sync::gate::Gate,
cancel: CancellationToken,
) -> anyhow::Result<Self> {
Ok(Self {
@@ -332,6 +335,7 @@ impl SplitDeltaLayerWriter {
lsn_range,
last_key_written: Key::MIN,
batches: BatchLayerWriter::new(conf).await?,
gate,
cancel,
})
}
@@ -341,7 +345,6 @@ impl SplitDeltaLayerWriter {
key: Key,
lsn: Lsn,
val: Value,
gate: &utils::sync::gate::Gate,
ctx: &RequestContext,
) -> anyhow::Result<()> {
// The current estimation is key size plus LSN size plus value size estimation. This is not an accurate
@@ -359,7 +362,7 @@ impl SplitDeltaLayerWriter {
self.tenant_shard_id,
key,
self.lsn_range.clone(),
gate,
self.gate,
self.cancel.clone(),
ctx,
)
@@ -379,7 +382,7 @@ impl SplitDeltaLayerWriter {
self.tenant_shard_id,
key,
self.lsn_range.clone(),
gate,
self.gate,
self.cancel.clone(),
ctx,
)
@@ -501,13 +504,14 @@ mod tests {
tenant.tenant_shard_id,
Lsn(0x18)..Lsn(0x20),
4 * 1024 * 1024,
&tline.gate,
tline.cancel.clone(),
)
.await
.unwrap();
image_writer
.put_image(get_key(0), get_img(0), &tline.gate, &ctx)
.put_image(get_key(0), get_img(0), &ctx)
.await
.unwrap();
let layers = image_writer
@@ -517,13 +521,7 @@ mod tests {
assert_eq!(layers.len(), 1);
delta_writer
.put_value(
get_key(0),
Lsn(0x18),
Value::Image(get_img(0)),
&tline.gate,
&ctx,
)
.put_value(get_key(0), Lsn(0x18), Value::Image(get_img(0)), &ctx)
.await
.unwrap();
let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
@@ -586,6 +584,7 @@ mod tests {
tenant.tenant_shard_id,
Lsn(0x18)..Lsn(0x20),
4 * 1024 * 1024,
&tline.gate,
tline.cancel.clone(),
)
.await
@@ -594,17 +593,11 @@ mod tests {
for i in 0..N {
let i = i as u32;
image_writer
.put_image(get_key(i), get_large_img(), &tline.gate, &ctx)
.put_image(get_key(i), get_large_img(), &ctx)
.await
.unwrap();
delta_writer
.put_value(
get_key(i),
Lsn(0x20),
Value::Image(get_large_img()),
&tline.gate,
&ctx,
)
.put_value(get_key(i), Lsn(0x20), Value::Image(get_large_img()), &ctx)
.await
.unwrap();
}
@@ -693,17 +686,18 @@ mod tests {
tenant.tenant_shard_id,
Lsn(0x18)..Lsn(0x20),
4 * 1024,
&tline.gate,
tline.cancel.clone(),
)
.await
.unwrap();
image_writer
.put_image(get_key(0), get_img(0), &tline.gate, &ctx)
.put_image(get_key(0), get_img(0), &ctx)
.await
.unwrap();
image_writer
.put_image(get_key(1), get_large_img(), &tline.gate, &ctx)
.put_image(get_key(1), get_large_img(), &ctx)
.await
.unwrap();
let layers = image_writer
@@ -713,23 +707,11 @@ mod tests {
assert_eq!(layers.len(), 2);
delta_writer
.put_value(
get_key(0),
Lsn(0x18),
Value::Image(get_img(0)),
&tline.gate,
&ctx,
)
.put_value(get_key(0), Lsn(0x18), Value::Image(get_img(0)), &ctx)
.await
.unwrap();
delta_writer
.put_value(
get_key(1),
Lsn(0x1A),
Value::Image(get_large_img()),
&tline.gate,
&ctx,
)
.put_value(get_key(1), Lsn(0x1A), Value::Image(get_large_img()), &ctx)
.await
.unwrap();
let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
@@ -782,6 +764,7 @@ mod tests {
tenant.tenant_shard_id,
Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
4 * 1024 * 1024,
&tline.gate,
tline.cancel.clone(),
)
.await
@@ -794,7 +777,6 @@ mod tests {
get_key(0),
Lsn(i as u64 * 16 + 0x10),
Value::Image(get_large_img()),
&tline.gate,
&ctx,
)
.await

View File

@@ -747,10 +747,9 @@ impl KeyHistoryRetention {
async fn pipe_to(
self,
key: Key,
delta_writer: &mut SplitDeltaLayerWriter,
mut image_writer: Option<&mut SplitImageLayerWriter>,
delta_writer: &mut SplitDeltaLayerWriter<'_>,
mut image_writer: Option<&mut SplitImageLayerWriter<'_>>,
stat: &mut CompactionStatistics,
gate: &utils::sync::gate::Gate,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let mut first_batch = true;
@@ -762,30 +761,30 @@ impl KeyHistoryRetention {
};
stat.produce_image_key(img);
if let Some(image_writer) = image_writer.as_mut() {
image_writer.put_image(key, img.clone(), gate, ctx).await?;
image_writer.put_image(key, img.clone(), ctx).await?;
} else {
delta_writer
.put_value(key, cutoff_lsn, Value::Image(img.clone()), gate, ctx)
.put_value(key, cutoff_lsn, Value::Image(img.clone()), ctx)
.await?;
}
} else {
for (lsn, val) in logs {
stat.produce_key(&val);
delta_writer.put_value(key, lsn, val, gate, ctx).await?;
delta_writer.put_value(key, lsn, val, ctx).await?;
}
}
first_batch = false;
} else {
for (lsn, val) in logs {
stat.produce_key(&val);
delta_writer.put_value(key, lsn, val, gate, ctx).await?;
delta_writer.put_value(key, lsn, val, ctx).await?;
}
}
}
let KeyLogAtLsn(above_horizon_logs) = self.above_horizon;
for (lsn, val) in above_horizon_logs {
stat.produce_key(&val);
delta_writer.put_value(key, lsn, val, gate, ctx).await?;
delta_writer.put_value(key, lsn, val, ctx).await?;
}
Ok(())
}
@@ -3106,6 +3105,7 @@ impl Timeline {
self.tenant_shard_id,
lowest_retain_lsn..end_lsn,
self.get_compaction_target_size(),
&self.gate,
self.cancel.clone(),
)
.await
@@ -3275,7 +3275,6 @@ impl Timeline {
&mut delta_layer_writer,
image_layer_writer.as_mut(),
&mut stat,
&self.gate,
ctx,
)
.await
@@ -3316,7 +3315,6 @@ impl Timeline {
&mut delta_layer_writer,
image_layer_writer.as_mut(),
&mut stat,
&self.gate,
ctx,
)
.await