mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 09:22:55 +00:00
@@ -353,6 +353,11 @@ pub trait Layer: std::fmt::Debug + Send + Sync {
|
||||
/// the previous non-incremental layer.
|
||||
fn is_incremental(&self) -> bool;
|
||||
|
||||
/// Is this a delta layer?
|
||||
fn is_delta(&self) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
///
|
||||
/// Return data needed to reconstruct given page at LSN.
|
||||
///
|
||||
|
||||
@@ -399,6 +399,10 @@ impl Layer for DeltaLayer {
|
||||
fn short_id(&self) -> String {
|
||||
self.layer_desc().short_id()
|
||||
}
|
||||
|
||||
fn is_delta(&self) -> bool {
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
impl PersistentLayer for DeltaLayer {
|
||||
|
||||
@@ -3359,7 +3359,7 @@ struct CompactLevel0Phase1Result {
|
||||
|
||||
#[derive(Default)]
|
||||
struct CompactTieredPhase1Result {
|
||||
new_layers: Vec<DeltaLayer>,
|
||||
new_layers: Vec<Arc<dyn PersistentLayer>>,
|
||||
new_tier_at: usize,
|
||||
removed_tiers: Vec<usize>,
|
||||
}
|
||||
@@ -3842,11 +3842,9 @@ impl Timeline {
|
||||
let mut total_size_up_to_lvl = 0;
|
||||
let mut compact_tiers = Vec::new();
|
||||
for (tier_id, size) in tier_sizes {
|
||||
if total_size_up_to_lvl != 0 {
|
||||
if total_size_up_to_lvl as f64 / size as f64 > size_ratio {
|
||||
info!("full compaction triggered by size ratio");
|
||||
return Some(compact_tiers);
|
||||
}
|
||||
if total_size_up_to_lvl != 0 && total_size_up_to_lvl as f64 / size as f64 > size_ratio {
|
||||
info!("full compaction triggered by size ratio");
|
||||
return Some(compact_tiers);
|
||||
}
|
||||
total_size_up_to_lvl += size;
|
||||
compact_tiers.push(tier_id);
|
||||
@@ -3903,6 +3901,7 @@ impl Timeline {
|
||||
let deltas_to_compact_layers = deltas_to_compact_layers
|
||||
.into_iter()
|
||||
.map(|l| self.lcache.get_from_desc(&l))
|
||||
.filter(|l| l.is_delta())
|
||||
.collect_vec();
|
||||
|
||||
let lsn_range = {
|
||||
@@ -3974,15 +3973,54 @@ impl Timeline {
|
||||
|
||||
// TODO(chi): merge with compact l0
|
||||
|
||||
let mut new_layers = Vec::new();
|
||||
let mut new_layers: Vec<Arc<dyn PersistentLayer>> = Vec::new();
|
||||
let mut prev_key: Option<Key> = None;
|
||||
let mut writer: Option<DeltaLayerWriter> = None;
|
||||
let mut image_writer: Option<ImageLayerWriter> = None;
|
||||
let mut key_values_total_size = 0u64;
|
||||
let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
|
||||
let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
|
||||
let mut same_key_cnt = 0;
|
||||
let mut construct_image_for_key = false;
|
||||
let image_lsn = Lsn(lsn_range.end.0 - 1);
|
||||
|
||||
for x in all_values_iter {
|
||||
let (key, lsn, value) = x?;
|
||||
let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
|
||||
if same_key {
|
||||
same_key_cnt += 1;
|
||||
} else {
|
||||
same_key_cnt = 1;
|
||||
construct_image_for_key = false;
|
||||
}
|
||||
if same_key_cnt >= 20 && !construct_image_for_key {
|
||||
let img = match self.get(key, image_lsn, ctx).await {
|
||||
Ok(img) => img,
|
||||
Err(err) => {
|
||||
if is_rel_fsm_block_key(key) || is_rel_vm_block_key(key) {
|
||||
warn!("could not reconstruct FSM or VM key {key}, filling with zeros: {err:?}");
|
||||
ZERO_PAGE.clone()
|
||||
} else {
|
||||
return Err(CompactionError::Other(err.into()));
|
||||
}
|
||||
}
|
||||
};
|
||||
if image_writer.is_none() {
|
||||
// Create writer if not initiaized yet
|
||||
image_writer = Some(ImageLayerWriter::new(
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
self.tenant_id,
|
||||
// TODO(chi): should not use the full key range
|
||||
&(key..Key::MAX),
|
||||
image_lsn,
|
||||
true,
|
||||
)?);
|
||||
}
|
||||
image_writer.as_mut().unwrap().put_image(key, &img)?;
|
||||
construct_image_for_key = true;
|
||||
}
|
||||
|
||||
// We need to check key boundaries once we reach next key or end of layer with the same key
|
||||
if !same_key || lsn == dup_end_lsn {
|
||||
let mut next_key_size = 0u64;
|
||||
@@ -4030,7 +4068,9 @@ impl Timeline {
|
||||
|| written_size + key_values_total_size > target_file_size
|
||||
{
|
||||
// ... if so, flush previous layer and prepare to write new one
|
||||
new_layers.push(writer.take().unwrap().finish(prev_key.unwrap().next())?);
|
||||
new_layers.push(Arc::new(
|
||||
writer.take().unwrap().finish(prev_key.unwrap().next())?,
|
||||
));
|
||||
writer = None;
|
||||
}
|
||||
}
|
||||
@@ -4063,12 +4103,17 @@ impl Timeline {
|
||||
prev_key = Some(key);
|
||||
}
|
||||
if let Some(writer) = writer {
|
||||
new_layers.push(writer.finish(prev_key.unwrap().next())?);
|
||||
new_layers.push(Arc::new(writer.finish(prev_key.unwrap().next())?));
|
||||
}
|
||||
|
||||
if let Some(image_writer) = image_writer {
|
||||
new_layers.push(Arc::new(image_writer.finish()?));
|
||||
}
|
||||
|
||||
// Sync layers
|
||||
if !new_layers.is_empty() {
|
||||
let mut layer_paths: Vec<PathBuf> = new_layers.iter().map(|l| l.path()).collect();
|
||||
let mut layer_paths: Vec<PathBuf> =
|
||||
new_layers.iter().map(|l| l.local_path().unwrap()).collect();
|
||||
|
||||
// Fsync all the layer files and directory using multiple threads to
|
||||
// minimize latency.
|
||||
@@ -4148,7 +4193,7 @@ impl Timeline {
|
||||
let mut new_layer_paths = HashMap::with_capacity(new_layers.len());
|
||||
let mut new_layer_descs = vec![];
|
||||
for l in new_layers {
|
||||
let new_path = l.path();
|
||||
let new_path = l.local_path().unwrap();
|
||||
|
||||
let metadata = new_path.metadata().with_context(|| {
|
||||
format!(
|
||||
@@ -4170,15 +4215,14 @@ impl Timeline {
|
||||
.add(metadata.len());
|
||||
|
||||
new_layer_paths.insert(new_path, LayerFileMetadata::new(metadata.len()));
|
||||
let x: Arc<dyn PersistentLayer + 'static> = Arc::new(l);
|
||||
x.access_stats().record_residence_event(
|
||||
l.access_stats().record_residence_event(
|
||||
&updates,
|
||||
LayerResidenceStatus::Resident,
|
||||
LayerResidenceEventReason::LayerCreate,
|
||||
);
|
||||
updates.insert_historic_new(x.layer_desc().clone());
|
||||
new_layer_descs.push(x.layer_desc().clone().into());
|
||||
self.lcache.create_new_layer(x);
|
||||
updates.insert_historic_new(l.layer_desc().clone());
|
||||
new_layer_descs.push(l.layer_desc().clone().into());
|
||||
self.lcache.create_new_layer(l);
|
||||
}
|
||||
|
||||
let new_tier_id = updates.next_tier_id();
|
||||
|
||||
Reference in New Issue
Block a user