mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-04 12:02:55 +00:00
pageserver: Make ImageLayerWriter sync, infallible and lazy (#12403)
## Problem ## Summary of changes Make ImageLayerWriter sync, infallible and lazy. Address https://github.com/neondatabase/neon/issues/12389. All unit tests passed.
This commit is contained in:
@@ -182,7 +182,7 @@ impl BatchLayerWriter {
|
||||
/// An image writer that takes images and produces multiple image layers.
|
||||
#[must_use]
|
||||
pub struct SplitImageLayerWriter<'a> {
|
||||
inner: ImageLayerWriter,
|
||||
inner: Option<ImageLayerWriter>,
|
||||
target_layer_size: u64,
|
||||
lsn: Lsn,
|
||||
conf: &'static PageServerConf,
|
||||
@@ -196,7 +196,7 @@ pub struct SplitImageLayerWriter<'a> {
|
||||
|
||||
impl<'a> SplitImageLayerWriter<'a> {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn new(
|
||||
pub fn new(
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
@@ -205,22 +205,10 @@ impl<'a> SplitImageLayerWriter<'a> {
|
||||
target_layer_size: u64,
|
||||
gate: &'a utils::sync::gate::Gate,
|
||||
cancel: CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Self> {
|
||||
Ok(Self {
|
||||
) -> Self {
|
||||
Self {
|
||||
target_layer_size,
|
||||
// XXX make this lazy like in SplitDeltaLayerWriter?
|
||||
inner: ImageLayerWriter::new(
|
||||
conf,
|
||||
timeline_id,
|
||||
tenant_shard_id,
|
||||
&(start_key..Key::MAX),
|
||||
lsn,
|
||||
gate,
|
||||
cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?,
|
||||
inner: None,
|
||||
conf,
|
||||
timeline_id,
|
||||
tenant_shard_id,
|
||||
@@ -229,7 +217,7 @@ impl<'a> SplitImageLayerWriter<'a> {
|
||||
start_key,
|
||||
gate,
|
||||
cancel,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn put_image(
|
||||
@@ -238,12 +226,31 @@ impl<'a> SplitImageLayerWriter<'a> {
|
||||
img: Bytes,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), PutError> {
|
||||
if self.inner.is_none() {
|
||||
self.inner = Some(
|
||||
ImageLayerWriter::new(
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
self.tenant_shard_id,
|
||||
&(self.start_key..Key::MAX),
|
||||
self.lsn,
|
||||
self.gate,
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.map_err(PutError::Other)?,
|
||||
);
|
||||
}
|
||||
|
||||
let inner = self.inner.as_mut().unwrap();
|
||||
|
||||
// The current estimation is an upper bound of the space that the key/image could take
|
||||
// because we did not consider compression in this estimation. The resulting image layer
|
||||
// could be smaller than the target size.
|
||||
let addition_size_estimation = KEY_SIZE as u64 + img.len() as u64;
|
||||
if self.inner.num_keys() >= 1
|
||||
&& self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size
|
||||
if inner.num_keys() >= 1
|
||||
&& inner.estimated_size() + addition_size_estimation >= self.target_layer_size
|
||||
{
|
||||
let next_image_writer = ImageLayerWriter::new(
|
||||
self.conf,
|
||||
@@ -257,7 +264,7 @@ impl<'a> SplitImageLayerWriter<'a> {
|
||||
)
|
||||
.await
|
||||
.map_err(PutError::Other)?;
|
||||
let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
|
||||
let prev_image_writer = std::mem::replace(inner, next_image_writer);
|
||||
self.batches.add_unfinished_image_writer(
|
||||
prev_image_writer,
|
||||
self.start_key..key,
|
||||
@@ -265,7 +272,7 @@ impl<'a> SplitImageLayerWriter<'a> {
|
||||
);
|
||||
self.start_key = key;
|
||||
}
|
||||
self.inner.put_image(key, img, ctx).await
|
||||
inner.put_image(key, img, ctx).await
|
||||
}
|
||||
|
||||
pub(crate) async fn finish_with_discard_fn<D, F>(
|
||||
@@ -282,8 +289,10 @@ impl<'a> SplitImageLayerWriter<'a> {
|
||||
let Self {
|
||||
mut batches, inner, ..
|
||||
} = self;
|
||||
if inner.num_keys() != 0 {
|
||||
batches.add_unfinished_image_writer(inner, self.start_key..end_key, self.lsn);
|
||||
if let Some(inner) = inner {
|
||||
if inner.num_keys() != 0 {
|
||||
batches.add_unfinished_image_writer(inner, self.start_key..end_key, self.lsn);
|
||||
}
|
||||
}
|
||||
batches.finish_with_discard_fn(tline, ctx, discard_fn).await
|
||||
}
|
||||
@@ -498,10 +507,7 @@ mod tests {
|
||||
4 * 1024 * 1024,
|
||||
&tline.gate,
|
||||
tline.cancel.clone(),
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
);
|
||||
|
||||
let mut delta_writer = SplitDeltaLayerWriter::new(
|
||||
tenant.conf,
|
||||
@@ -577,10 +583,7 @@ mod tests {
|
||||
4 * 1024 * 1024,
|
||||
&tline.gate,
|
||||
tline.cancel.clone(),
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
);
|
||||
let mut delta_writer = SplitDeltaLayerWriter::new(
|
||||
tenant.conf,
|
||||
tline.timeline_id,
|
||||
@@ -676,10 +679,7 @@ mod tests {
|
||||
4 * 1024,
|
||||
&tline.gate,
|
||||
tline.cancel.clone(),
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
);
|
||||
|
||||
let mut delta_writer = SplitDeltaLayerWriter::new(
|
||||
tenant.conf,
|
||||
|
||||
@@ -3503,22 +3503,16 @@ impl Timeline {
|
||||
// Only create image layers when there is no ancestor branches. TODO: create covering image layer
|
||||
// when some condition meet.
|
||||
let mut image_layer_writer = if !has_data_below {
|
||||
Some(
|
||||
SplitImageLayerWriter::new(
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
self.tenant_shard_id,
|
||||
job_desc.compaction_key_range.start,
|
||||
lowest_retain_lsn,
|
||||
self.get_compaction_target_size(),
|
||||
&self.gate,
|
||||
self.cancel.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.context("failed to create image layer writer")
|
||||
.map_err(CompactionError::Other)?,
|
||||
)
|
||||
Some(SplitImageLayerWriter::new(
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
self.tenant_shard_id,
|
||||
job_desc.compaction_key_range.start,
|
||||
lowest_retain_lsn,
|
||||
self.get_compaction_target_size(),
|
||||
&self.gate,
|
||||
self.cancel.clone(),
|
||||
))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user