mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-14 03:30:36 +00:00
feat(pageserver): split delta writer automatically determines key range (#8850)
close https://github.com/neondatabase/neon/issues/8838 ## Summary of changes This patch modifies the split delta layer writer to avoid taking start_key and end_key when creating/finishing the layer writer. The start_key for the delta layers will be the first key provided to the layer writer, and the end_key would be the `last_key.next()`. This simplifies the delta layer writer API. On that, the layer key hack is removed. Image layers now use the full key range, and delta layers use the first/last key provided by the user. --------- Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
@@ -263,15 +263,6 @@ impl Key {
|
||||
field5: u8::MAX,
|
||||
field6: u32::MAX,
|
||||
};
|
||||
/// A key slightly smaller than [`Key::MAX`] for use in layer key ranges to avoid them to be confused with L0 layers
|
||||
pub const NON_L0_MAX: Key = Key {
|
||||
field1: u8::MAX,
|
||||
field2: u32::MAX,
|
||||
field3: u32::MAX,
|
||||
field4: u32::MAX,
|
||||
field5: u8::MAX,
|
||||
field6: u32::MAX - 1,
|
||||
};
|
||||
|
||||
pub fn from_hex(s: &str) -> Result<Self> {
|
||||
if s.len() != 36 {
|
||||
|
||||
@@ -7091,13 +7091,13 @@ mod tests {
|
||||
vec![
|
||||
// Image layer at GC horizon
|
||||
PersistentLayerKey {
|
||||
key_range: Key::MIN..Key::NON_L0_MAX,
|
||||
key_range: Key::MIN..Key::MAX,
|
||||
lsn_range: Lsn(0x30)..Lsn(0x31),
|
||||
is_delta: false
|
||||
},
|
||||
// The delta layer covers the full range (with the layer key hack to avoid being recognized as L0)
|
||||
// The delta layer below the horizon
|
||||
PersistentLayerKey {
|
||||
key_range: Key::MIN..Key::NON_L0_MAX,
|
||||
key_range: get_key(3)..get_key(4),
|
||||
lsn_range: Lsn(0x30)..Lsn(0x48),
|
||||
is_delta: true
|
||||
},
|
||||
|
||||
@@ -188,7 +188,7 @@ impl SplitImageLayerWriter {
|
||||
.await
|
||||
}
|
||||
|
||||
/// When split writer fails, the caller should call this function and handle partially generated layers.
|
||||
/// This function will be deprecated with #8841.
|
||||
pub(crate) fn take(self) -> anyhow::Result<(Vec<SplitWriterResult>, ImageLayerWriter)> {
|
||||
Ok((self.generated_layers, self.inner))
|
||||
}
|
||||
@@ -204,7 +204,7 @@ impl SplitImageLayerWriter {
|
||||
/// will split them into multiple files based on size.
|
||||
#[must_use]
|
||||
pub struct SplitDeltaLayerWriter {
|
||||
inner: DeltaLayerWriter,
|
||||
inner: Option<(Key, DeltaLayerWriter)>,
|
||||
target_layer_size: u64,
|
||||
generated_layers: Vec<SplitWriterResult>,
|
||||
conf: &'static PageServerConf,
|
||||
@@ -212,7 +212,6 @@ pub struct SplitDeltaLayerWriter {
|
||||
tenant_shard_id: TenantShardId,
|
||||
lsn_range: Range<Lsn>,
|
||||
last_key_written: Key,
|
||||
start_key: Key,
|
||||
}
|
||||
|
||||
impl SplitDeltaLayerWriter {
|
||||
@@ -220,29 +219,18 @@ impl SplitDeltaLayerWriter {
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
start_key: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
target_layer_size: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Self> {
|
||||
Ok(Self {
|
||||
target_layer_size,
|
||||
inner: DeltaLayerWriter::new(
|
||||
conf,
|
||||
timeline_id,
|
||||
tenant_shard_id,
|
||||
start_key,
|
||||
lsn_range.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?,
|
||||
inner: None,
|
||||
generated_layers: Vec::new(),
|
||||
conf,
|
||||
timeline_id,
|
||||
tenant_shard_id,
|
||||
lsn_range,
|
||||
last_key_written: Key::MIN,
|
||||
start_key,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -265,9 +253,26 @@ impl SplitDeltaLayerWriter {
|
||||
//
|
||||
// Also, keep all updates of a single key in a single file. TODO: split them using the legacy compaction
|
||||
// strategy. https://github.com/neondatabase/neon/issues/8837
|
||||
|
||||
if self.inner.is_none() {
|
||||
self.inner = Some((
|
||||
key,
|
||||
DeltaLayerWriter::new(
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
self.tenant_shard_id,
|
||||
key,
|
||||
self.lsn_range.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?,
|
||||
));
|
||||
}
|
||||
let (_, inner) = self.inner.as_mut().unwrap();
|
||||
|
||||
let addition_size_estimation = KEY_SIZE as u64 + 8 /* LSN u64 size */ + 80 /* value size estimation */;
|
||||
if self.inner.num_keys() >= 1
|
||||
&& self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size
|
||||
if inner.num_keys() >= 1
|
||||
&& inner.estimated_size() + addition_size_estimation >= self.target_layer_size
|
||||
{
|
||||
if key != self.last_key_written {
|
||||
let next_delta_writer = DeltaLayerWriter::new(
|
||||
@@ -279,13 +284,13 @@ impl SplitDeltaLayerWriter {
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
let prev_delta_writer = std::mem::replace(&mut self.inner, next_delta_writer);
|
||||
let (start_key, prev_delta_writer) =
|
||||
std::mem::replace(&mut self.inner, Some((key, next_delta_writer))).unwrap();
|
||||
let layer_key = PersistentLayerKey {
|
||||
key_range: self.start_key..key,
|
||||
key_range: start_key..key,
|
||||
lsn_range: self.lsn_range.clone(),
|
||||
is_delta: true,
|
||||
};
|
||||
self.start_key = key;
|
||||
if discard(&layer_key).await {
|
||||
drop(prev_delta_writer);
|
||||
self.generated_layers
|
||||
@@ -296,17 +301,18 @@ impl SplitDeltaLayerWriter {
|
||||
self.generated_layers
|
||||
.push(SplitWriterResult::Produced(delta_layer));
|
||||
}
|
||||
} else if self.inner.estimated_size() >= S3_UPLOAD_LIMIT {
|
||||
} else if inner.estimated_size() >= S3_UPLOAD_LIMIT {
|
||||
// We have to produce a very large file b/c a key is updated too often.
|
||||
anyhow::bail!(
|
||||
"a single key is updated too often: key={}, estimated_size={}, and the layer file cannot be produced",
|
||||
key,
|
||||
self.inner.estimated_size()
|
||||
inner.estimated_size()
|
||||
);
|
||||
}
|
||||
}
|
||||
self.last_key_written = key;
|
||||
self.inner.put_value(key, lsn, val, ctx).await
|
||||
let (_, inner) = self.inner.as_mut().unwrap();
|
||||
inner.put_value(key, lsn, val, ctx).await
|
||||
}
|
||||
|
||||
pub async fn put_value(
|
||||
@@ -325,7 +331,6 @@ impl SplitDeltaLayerWriter {
|
||||
self,
|
||||
tline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
end_key: Key,
|
||||
discard: D,
|
||||
) -> anyhow::Result<Vec<SplitWriterResult>>
|
||||
where
|
||||
@@ -337,11 +342,15 @@ impl SplitDeltaLayerWriter {
|
||||
inner,
|
||||
..
|
||||
} = self;
|
||||
let Some((start_key, inner)) = inner else {
|
||||
return Ok(generated_layers);
|
||||
};
|
||||
if inner.num_keys() == 0 {
|
||||
return Ok(generated_layers);
|
||||
}
|
||||
let end_key = self.last_key_written.next();
|
||||
let layer_key = PersistentLayerKey {
|
||||
key_range: self.start_key..end_key,
|
||||
key_range: start_key..end_key,
|
||||
lsn_range: self.lsn_range.clone(),
|
||||
is_delta: true,
|
||||
};
|
||||
@@ -360,15 +369,14 @@ impl SplitDeltaLayerWriter {
|
||||
self,
|
||||
tline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
end_key: Key,
|
||||
) -> anyhow::Result<Vec<SplitWriterResult>> {
|
||||
self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false })
|
||||
self.finish_with_discard_fn(tline, ctx, |_| async { false })
|
||||
.await
|
||||
}
|
||||
|
||||
/// When split writer fails, the caller should call this function and handle partially generated layers.
|
||||
pub(crate) fn take(self) -> anyhow::Result<(Vec<SplitWriterResult>, DeltaLayerWriter)> {
|
||||
Ok((self.generated_layers, self.inner))
|
||||
/// This function will be deprecated with #8841.
|
||||
pub(crate) fn take(self) -> anyhow::Result<(Vec<SplitWriterResult>, Option<DeltaLayerWriter>)> {
|
||||
Ok((self.generated_layers, self.inner.map(|x| x.1)))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -432,10 +440,8 @@ mod tests {
|
||||
tenant.conf,
|
||||
tline.timeline_id,
|
||||
tenant.tenant_shard_id,
|
||||
get_key(0),
|
||||
Lsn(0x18)..Lsn(0x20),
|
||||
4 * 1024 * 1024,
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -460,11 +466,22 @@ mod tests {
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let layers = delta_writer
|
||||
.finish(&tline, &ctx, get_key(10))
|
||||
.await
|
||||
.unwrap();
|
||||
let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
|
||||
assert_eq!(layers.len(), 1);
|
||||
assert_eq!(
|
||||
layers
|
||||
.into_iter()
|
||||
.next()
|
||||
.unwrap()
|
||||
.into_resident_layer()
|
||||
.layer_desc()
|
||||
.key(),
|
||||
PersistentLayerKey {
|
||||
key_range: get_key(0)..get_key(1),
|
||||
lsn_range: Lsn(0x18)..Lsn(0x20),
|
||||
is_delta: true
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -501,10 +518,8 @@ mod tests {
|
||||
tenant.conf,
|
||||
tline.timeline_id,
|
||||
tenant.tenant_shard_id,
|
||||
get_key(0),
|
||||
Lsn(0x18)..Lsn(0x20),
|
||||
4 * 1024 * 1024,
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -533,10 +548,7 @@ mod tests {
|
||||
.finish(&tline, &ctx, get_key(N as u32))
|
||||
.await
|
||||
.unwrap();
|
||||
let delta_layers = delta_writer
|
||||
.finish(&tline, &ctx, get_key(N as u32))
|
||||
.await
|
||||
.unwrap();
|
||||
let delta_layers = delta_writer.finish(&tline, &ctx).await.unwrap();
|
||||
if discard {
|
||||
for layer in image_layers {
|
||||
layer.into_discarded_layer();
|
||||
@@ -555,6 +567,14 @@ mod tests {
|
||||
.collect_vec();
|
||||
assert_eq!(image_layers.len(), N / 512 + 1);
|
||||
assert_eq!(delta_layers.len(), N / 512 + 1);
|
||||
assert_eq!(
|
||||
delta_layers.first().unwrap().layer_desc().key_range.start,
|
||||
get_key(0)
|
||||
);
|
||||
assert_eq!(
|
||||
delta_layers.last().unwrap().layer_desc().key_range.end,
|
||||
get_key(N as u32)
|
||||
);
|
||||
for idx in 0..image_layers.len() {
|
||||
assert_ne!(image_layers[idx].layer_desc().key_range.start, Key::MIN);
|
||||
assert_ne!(image_layers[idx].layer_desc().key_range.end, Key::MAX);
|
||||
@@ -602,10 +622,8 @@ mod tests {
|
||||
tenant.conf,
|
||||
tline.timeline_id,
|
||||
tenant.tenant_shard_id,
|
||||
get_key(0),
|
||||
Lsn(0x18)..Lsn(0x20),
|
||||
4 * 1024,
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -644,11 +662,35 @@ mod tests {
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let layers = delta_writer
|
||||
.finish(&tline, &ctx, get_key(10))
|
||||
.await
|
||||
.unwrap();
|
||||
let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
|
||||
assert_eq!(layers.len(), 2);
|
||||
let mut layers_iter = layers.into_iter();
|
||||
assert_eq!(
|
||||
layers_iter
|
||||
.next()
|
||||
.unwrap()
|
||||
.into_resident_layer()
|
||||
.layer_desc()
|
||||
.key(),
|
||||
PersistentLayerKey {
|
||||
key_range: get_key(0)..get_key(1),
|
||||
lsn_range: Lsn(0x18)..Lsn(0x20),
|
||||
is_delta: true
|
||||
}
|
||||
);
|
||||
assert_eq!(
|
||||
layers_iter
|
||||
.next()
|
||||
.unwrap()
|
||||
.into_resident_layer()
|
||||
.layer_desc()
|
||||
.key(),
|
||||
PersistentLayerKey {
|
||||
key_range: get_key(1)..get_key(2),
|
||||
lsn_range: Lsn(0x18)..Lsn(0x20),
|
||||
is_delta: true
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -668,10 +710,8 @@ mod tests {
|
||||
tenant.conf,
|
||||
tline.timeline_id,
|
||||
tenant.tenant_shard_id,
|
||||
get_key(0),
|
||||
Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
|
||||
4 * 1024 * 1024,
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -689,10 +729,20 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
let delta_layers = delta_writer
|
||||
.finish(&tline, &ctx, get_key(N as u32))
|
||||
.await
|
||||
.unwrap();
|
||||
let delta_layers = delta_writer.finish(&tline, &ctx).await.unwrap();
|
||||
assert_eq!(delta_layers.len(), 1);
|
||||
let delta_layer = delta_layers
|
||||
.into_iter()
|
||||
.next()
|
||||
.unwrap()
|
||||
.into_resident_layer();
|
||||
assert_eq!(
|
||||
delta_layer.layer_desc().key(),
|
||||
PersistentLayerKey {
|
||||
key_range: get_key(0)..get_key(1),
|
||||
lsn_range: Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
|
||||
is_delta: true
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1809,7 +1809,6 @@ impl Timeline {
|
||||
.unwrap();
|
||||
// We don't want any of the produced layers to cover the full key range (i.e., MIN..MAX) b/c it will then be recognized
|
||||
// as an L0 layer.
|
||||
let hack_end_key = Key::NON_L0_MAX;
|
||||
let mut delta_layers = Vec::new();
|
||||
let mut image_layers = Vec::new();
|
||||
let mut downloaded_layers = Vec::new();
|
||||
@@ -1855,10 +1854,8 @@ impl Timeline {
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
self.tenant_shard_id,
|
||||
Key::MIN,
|
||||
lowest_retain_lsn..end_lsn,
|
||||
self.get_compaction_target_size(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -1965,7 +1962,7 @@ impl Timeline {
|
||||
let produced_image_layers = if let Some(writer) = image_layer_writer {
|
||||
if !dry_run {
|
||||
writer
|
||||
.finish_with_discard_fn(self, ctx, hack_end_key, discard)
|
||||
.finish_with_discard_fn(self, ctx, Key::MAX, discard)
|
||||
.await?
|
||||
} else {
|
||||
let (layers, _) = writer.take()?;
|
||||
@@ -1978,7 +1975,7 @@ impl Timeline {
|
||||
|
||||
let produced_delta_layers = if !dry_run {
|
||||
delta_layer_writer
|
||||
.finish_with_discard_fn(self, ctx, hack_end_key, discard)
|
||||
.finish_with_discard_fn(self, ctx, discard)
|
||||
.await?
|
||||
} else {
|
||||
let (layers, _) = delta_layer_writer.take()?;
|
||||
|
||||
Reference in New Issue
Block a user