mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-04 14:00:38 +00:00
@@ -486,12 +486,14 @@ struct ImageLayerWriterInner {
|
||||
path: PathBuf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_id: TenantId,
|
||||
key_range: Range<Key>,
|
||||
lsn: Lsn,
|
||||
is_incremental: bool,
|
||||
|
||||
blob_writer: WriteBlobWriter<VirtualFile>,
|
||||
tree: DiskBtreeBuilder<BlockBuf, KEY_SIZE>,
|
||||
|
||||
start_key: Key,
|
||||
last_key: Option<Key>,
|
||||
}
|
||||
|
||||
impl ImageLayerWriterInner {
|
||||
@@ -502,8 +504,8 @@ impl ImageLayerWriterInner {
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_id: TenantId,
|
||||
key_range: &Range<Key>,
|
||||
lsn: Lsn,
|
||||
start_key: Key,
|
||||
is_incremental: bool,
|
||||
) -> anyhow::Result<Self> {
|
||||
// Create the file initially with a temporary filename.
|
||||
@@ -513,7 +515,7 @@ impl ImageLayerWriterInner {
|
||||
timeline_id,
|
||||
tenant_id,
|
||||
&ImageFileName {
|
||||
key_range: key_range.clone(),
|
||||
key_range: start_key..start_key, // TODO(chi): use number instead of dummy range
|
||||
lsn,
|
||||
},
|
||||
);
|
||||
@@ -535,11 +537,12 @@ impl ImageLayerWriterInner {
|
||||
path,
|
||||
timeline_id,
|
||||
tenant_id,
|
||||
key_range: key_range.clone(),
|
||||
lsn,
|
||||
tree: tree_builder,
|
||||
blob_writer,
|
||||
is_incremental,
|
||||
start_key,
|
||||
last_key: None,
|
||||
};
|
||||
|
||||
Ok(writer)
|
||||
@@ -551,7 +554,14 @@ impl ImageLayerWriterInner {
|
||||
/// The page versions must be appended in blknum order.
|
||||
///
|
||||
fn put_image(&mut self, key: Key, img: &[u8]) -> anyhow::Result<()> {
|
||||
ensure!(self.key_range.contains(&key));
|
||||
if cfg!(debug_assertions) {
|
||||
ensure!(key >= self.start_key);
|
||||
if let Some(last_key) = self.last_key.as_ref() {
|
||||
ensure!(last_key < &key);
|
||||
}
|
||||
self.last_key = Some(key.clone());
|
||||
}
|
||||
|
||||
let off = self.blob_writer.write_blob(img)?;
|
||||
|
||||
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
|
||||
@@ -564,7 +574,7 @@ impl ImageLayerWriterInner {
|
||||
///
|
||||
/// Finish writing the image layer.
|
||||
///
|
||||
fn finish(self) -> anyhow::Result<ImageLayer> {
|
||||
fn finish(self, end_key: Key) -> anyhow::Result<ImageLayer> {
|
||||
let index_start_blk =
|
||||
((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
|
||||
|
||||
@@ -577,13 +587,15 @@ impl ImageLayerWriterInner {
|
||||
file.write_all(buf.as_ref())?;
|
||||
}
|
||||
|
||||
let key_range = self.start_key.clone()..end_key;
|
||||
|
||||
// Fill in the summary on blk 0
|
||||
let summary = Summary {
|
||||
magic: IMAGE_FILE_MAGIC,
|
||||
format_version: STORAGE_FORMAT_VERSION,
|
||||
tenant_id: self.tenant_id,
|
||||
timeline_id: self.timeline_id,
|
||||
key_range: self.key_range.clone(),
|
||||
key_range: key_range.clone(),
|
||||
lsn: self.lsn,
|
||||
index_start_blk,
|
||||
index_root_blk,
|
||||
@@ -598,7 +610,7 @@ impl ImageLayerWriterInner {
|
||||
let desc = PersistentLayerDesc::new_img(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
self.key_range.clone(),
|
||||
key_range.clone(),
|
||||
self.lsn,
|
||||
self.is_incremental, // for now, image layer ALWAYS covers the full range
|
||||
metadata.len(),
|
||||
@@ -632,7 +644,7 @@ impl ImageLayerWriterInner {
|
||||
self.timeline_id,
|
||||
self.tenant_id,
|
||||
&ImageFileName {
|
||||
key_range: self.key_range.clone(),
|
||||
key_range,
|
||||
lsn: self.lsn,
|
||||
},
|
||||
);
|
||||
@@ -642,6 +654,10 @@ impl ImageLayerWriterInner {
|
||||
|
||||
Ok(layer)
|
||||
}
|
||||
|
||||
fn size(&self) -> u64 {
|
||||
self.blob_writer.size() + self.tree.borrow_writer().size()
|
||||
}
|
||||
}
|
||||
|
||||
/// A builder object for constructing a new image layer.
|
||||
@@ -678,7 +694,7 @@ impl ImageLayerWriter {
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_id: TenantId,
|
||||
key_range: &Range<Key>,
|
||||
start_key: Key,
|
||||
lsn: Lsn,
|
||||
is_incremental: bool,
|
||||
) -> anyhow::Result<ImageLayerWriter> {
|
||||
@@ -687,8 +703,8 @@ impl ImageLayerWriter {
|
||||
conf,
|
||||
timeline_id,
|
||||
tenant_id,
|
||||
key_range,
|
||||
lsn,
|
||||
start_key,
|
||||
is_incremental,
|
||||
)?),
|
||||
})
|
||||
@@ -706,8 +722,12 @@ impl ImageLayerWriter {
|
||||
///
|
||||
/// Finish writing the image layer.
|
||||
///
|
||||
pub fn finish(mut self) -> anyhow::Result<ImageLayer> {
|
||||
self.inner.take().unwrap().finish()
|
||||
pub fn finish(mut self, end_key: Key) -> anyhow::Result<ImageLayer> {
|
||||
self.inner.take().unwrap().finish(end_key)
|
||||
}
|
||||
|
||||
pub fn size(&self) -> u64 {
|
||||
self.inner.as_ref().unwrap().size()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -173,14 +173,16 @@ impl PersistentLayerDesc {
|
||||
|
||||
pub fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
|
||||
println!(
|
||||
"----- layer for ten {} tli {} keys {}-{} lsn {}-{} size {} ----",
|
||||
"----- layer for ten {} tli {} keys {}-{} lsn {}-{} size {} is_delta {} is_incremental {} ----",
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
self.key_range.start,
|
||||
self.key_range.end,
|
||||
self.lsn_range.start,
|
||||
self.lsn_range.end,
|
||||
self.file_size
|
||||
self.file_size,
|
||||
self.is_delta,
|
||||
self.is_incremental
|
||||
);
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -3235,7 +3235,7 @@ impl Timeline {
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
self.tenant_id,
|
||||
&img_range,
|
||||
img_range.start.clone(),
|
||||
lsn,
|
||||
false, // image layer always covers the full range
|
||||
)?;
|
||||
@@ -3278,7 +3278,7 @@ impl Timeline {
|
||||
key = key.next();
|
||||
}
|
||||
}
|
||||
let image_layer = image_layer_writer.finish()?;
|
||||
let image_layer = image_layer_writer.finish(img_range.end.clone())?;
|
||||
image_layers.push(image_layer);
|
||||
}
|
||||
}
|
||||
@@ -3749,6 +3749,7 @@ impl Timeline {
|
||||
fn get_compact_task(tier_sizes: Vec<(usize, u64)>) -> Option<Vec<usize>> {
|
||||
let size_ratio = 1.25;
|
||||
let space_amplification_ratio = 2.0;
|
||||
let max_merge_width = 20;
|
||||
|
||||
// Trigger 1: by space amplification, do full compaction
|
||||
let total_tier_size = tier_sizes.iter().map(|(_, size)| *size).sum::<u64>();
|
||||
@@ -3756,12 +3757,14 @@ impl Timeline {
|
||||
let estimated_space_amp = (total_tier_size - last_tier_size) as f64 / last_tier_size as f64;
|
||||
if estimated_space_amp > space_amplification_ratio {
|
||||
info!("full compaction triggered by space amplification");
|
||||
return Some(
|
||||
tier_sizes
|
||||
.iter()
|
||||
.map(|(tier_id, _)| *tier_id)
|
||||
.collect::<Vec<_>>(),
|
||||
);
|
||||
let tiers = tier_sizes
|
||||
.iter()
|
||||
.rev()
|
||||
.take(max_merge_width)
|
||||
.rev()
|
||||
.map(|(tier_id, _)| *tier_id)
|
||||
.collect::<Vec<_>>();
|
||||
return Some(tiers);
|
||||
}
|
||||
|
||||
// Trigger 2: by size ratio
|
||||
@@ -3770,6 +3773,13 @@ impl Timeline {
|
||||
for (tier_id, size) in tier_sizes {
|
||||
if total_size_up_to_lvl != 0 && size as f64 / total_size_up_to_lvl as f64 > size_ratio {
|
||||
info!("compaction triggered by size ratio");
|
||||
let compact_tiers = compact_tiers
|
||||
.iter()
|
||||
.rev()
|
||||
.take(max_merge_width)
|
||||
.rev()
|
||||
.copied()
|
||||
.collect_vec();
|
||||
return Some(compact_tiers);
|
||||
}
|
||||
total_size_up_to_lvl += size;
|
||||
@@ -3914,6 +3924,8 @@ impl Timeline {
|
||||
let mut construct_image_for_key = false;
|
||||
let image_lsn = Lsn(lsn_range.end.0 - 1);
|
||||
|
||||
const PAGE_MATERIALIZE_THRESHOLD: usize = 32;
|
||||
|
||||
for x in all_values_iter {
|
||||
let (key, lsn, value) = x?;
|
||||
let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
|
||||
@@ -3923,7 +3935,7 @@ impl Timeline {
|
||||
same_key_cnt = 1;
|
||||
construct_image_for_key = false;
|
||||
}
|
||||
if same_key_cnt >= 20 && !construct_image_for_key {
|
||||
if same_key_cnt >= PAGE_MATERIALIZE_THRESHOLD && !construct_image_for_key {
|
||||
let img = match self.get(key, image_lsn, ctx).await {
|
||||
Ok(img) => img,
|
||||
Err(err) => {
|
||||
@@ -3942,13 +3954,21 @@ impl Timeline {
|
||||
self.timeline_id,
|
||||
self.tenant_id,
|
||||
// TODO(chi): should not use the full key range
|
||||
&(key..Key::MAX),
|
||||
key.clone(),
|
||||
image_lsn,
|
||||
true,
|
||||
)?);
|
||||
}
|
||||
image_writer.as_mut().unwrap().put_image(key, &img)?;
|
||||
|
||||
let image_writer_mut = image_writer.as_mut().unwrap();
|
||||
image_writer_mut.put_image(key, &img)?;
|
||||
construct_image_for_key = true;
|
||||
|
||||
let written_size: u64 = image_writer_mut.size();
|
||||
if written_size + key_values_total_size > target_file_size {
|
||||
new_layers.push(Arc::new(image_writer.take().unwrap().finish(key.next())?));
|
||||
image_writer = None;
|
||||
}
|
||||
}
|
||||
|
||||
// We need to check key boundaries once we reach next key or end of layer with the same key
|
||||
@@ -4033,11 +4053,11 @@ impl Timeline {
|
||||
prev_key = Some(key);
|
||||
}
|
||||
if let Some(writer) = writer {
|
||||
new_layers.push(Arc::new(writer.finish(prev_key.unwrap().next())?));
|
||||
new_layers.push(Arc::new(writer.finish(prev_key.as_ref().unwrap().next())?));
|
||||
}
|
||||
|
||||
if let Some(image_writer) = image_writer {
|
||||
new_layers.push(Arc::new(image_writer.finish()?));
|
||||
new_layers.push(Arc::new(image_writer.finish(prev_key.unwrap().next())?));
|
||||
}
|
||||
|
||||
// Sync layers
|
||||
|
||||
Reference in New Issue
Block a user