From db1ea8b43039de6905f597d590abcab5f71e0626 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Mon, 28 Nov 2022 11:47:46 +0200 Subject: [PATCH] WIP: get rid of layer_removal_cs --- pageserver/src/http/routes.rs | 5 +- pageserver/src/tenant.rs | 47 +- pageserver/src/tenant/delta_layer.rs | 21 +- pageserver/src/tenant/image_layer.rs | 21 +- pageserver/src/tenant/inmemory_layer.rs | 10 +- pageserver/src/tenant/storage_layer.rs | 26 +- pageserver/src/tenant/timeline.rs | 787 ++++++++++++------------ pageserver/src/tenant_tasks.rs | 2 +- 8 files changed, 496 insertions(+), 423 deletions(-) diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index dff2266033..32f96b3c5c 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -749,7 +749,10 @@ async fn timeline_compact_handler(request: Request) -> Result anyhow::Result<()> { + pub async fn compaction_iteration(&self) -> anyhow::Result<()> { anyhow::ensure!( self.is_active(), "Cannot run compaction iteration on inactive tenant" @@ -1201,16 +1201,19 @@ impl Tenant { // while holding the lock. Then drop the lock and actually perform the // compactions. We don't want to block everything else while the // compaction runs. - let timelines = self.timelines.lock().unwrap(); - let timelines_to_compact = timelines - .iter() - .map(|(timeline_id, timeline)| (*timeline_id, timeline.clone())) - .collect::>(); - drop(timelines); + let timelines_to_compact = { + let timelines = self.timelines.lock().unwrap(); + timelines + .iter() + .map(|(timeline_id, timeline)| (*timeline_id, timeline.clone())) + .collect::>() + }; for (timeline_id, timeline) in &timelines_to_compact { - let _entered = info_span!("compact_timeline", timeline = %timeline_id).entered(); - timeline.compact()?; + timeline + .compact() + .instrument(info_span!("compact_timeline", timeline = %timeline_id)) + .await?; } Ok(()) @@ -1267,7 +1270,8 @@ impl Tenant { let timeline = timeline_entry.get(); timeline.set_state(TimelineState::Paused); - let layer_removal_guard = timeline.layer_removal_guard()?; + // FIXME: Wait for all tasks, including GC and compaction, that are working on the + // timeline, to finish. let local_timeline_directory = self.conf.timeline_path(&timeline_id, &self.tenant_id); std::fs::remove_dir_all(&local_timeline_directory).with_context(|| { @@ -1278,7 +1282,6 @@ impl Tenant { })?; info!("detach removed files"); - drop(layer_removal_guard); timeline_entry.remove(); Ok(()) @@ -1767,7 +1770,7 @@ impl Tenant { ); } - let result = timeline.gc()?; + let result = timeline.gc().await?; totals += result; } @@ -3057,7 +3060,7 @@ mod tests { drop(writer); tline.checkpoint(CheckpointConfig::Forced).await?; - tline.compact()?; + tline.compact().await?; let writer = tline.writer(); writer.put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))?; @@ -3065,7 +3068,7 @@ mod tests { drop(writer); tline.checkpoint(CheckpointConfig::Forced).await?; - tline.compact()?; + tline.compact().await?; let writer = tline.writer(); writer.put(*TEST_KEY, Lsn(0x30), &Value::Image(TEST_IMG("foo at 0x30")))?; @@ -3073,7 +3076,7 @@ mod tests { drop(writer); tline.checkpoint(CheckpointConfig::Forced).await?; - tline.compact()?; + tline.compact().await?; let writer = tline.writer(); writer.put(*TEST_KEY, Lsn(0x40), &Value::Image(TEST_IMG("foo at 0x40")))?; @@ -3081,7 +3084,7 @@ mod tests { drop(writer); tline.checkpoint(CheckpointConfig::Forced).await?; - tline.compact()?; + tline.compact().await?; assert_eq!(tline.get(*TEST_KEY, Lsn(0x10))?, TEST_IMG("foo at 0x10")); assert_eq!(tline.get(*TEST_KEY, Lsn(0x1f))?, TEST_IMG("foo at 0x10")); @@ -3131,8 +3134,8 @@ mod tests { tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO)?; tline.checkpoint(CheckpointConfig::Forced).await?; - tline.compact()?; - tline.gc()?; + tline.compact().await?; + tline.gc().await?; } Ok(()) @@ -3203,8 +3206,8 @@ mod tests { let cutoff = tline.get_last_record_lsn(); tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO)?; tline.checkpoint(CheckpointConfig::Forced).await?; - tline.compact()?; - tline.gc()?; + tline.compact().await?; + tline.gc().await?; } Ok(()) @@ -3286,8 +3289,8 @@ mod tests { let cutoff = tline.get_last_record_lsn(); tline.update_gc_info(Vec::new(), cutoff, Duration::ZERO)?; tline.checkpoint(CheckpointConfig::Forced).await?; - tline.compact()?; - tline.gc()?; + tline.compact().await?; + tline.gc().await?; } Ok(()) diff --git a/pageserver/src/tenant/delta_layer.rs b/pageserver/src/tenant/delta_layer.rs index dcd6956640..2873985d05 100644 --- a/pageserver/src/tenant/delta_layer.rs +++ b/pageserver/src/tenant/delta_layer.rs @@ -30,14 +30,15 @@ use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter}; use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockReader, FileBlockReader}; use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection}; use crate::tenant::filename::{DeltaFileName, PathOrConf}; -use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState}; +use crate::tenant::storage_layer::{ + DropNotify, Layer, ValueReconstructResult, ValueReconstructState, +}; use crate::virtual_file::VirtualFile; use crate::{walrecord, TEMP_FILE_SUFFIX}; use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION}; use anyhow::{bail, ensure, Context, Result}; use rand::{distributions::Alphanumeric, Rng}; use serde::{Deserialize, Serialize}; -use std::fs; use std::io::{BufWriter, Write}; use std::io::{Seek, SeekFrom}; use std::ops::Range; @@ -191,6 +192,8 @@ pub struct DeltaLayerInner { /// Reader object for reading blocks from the file. (None if not loaded yet) file: Option>, + + drop_watch: Option, } impl Layer for DeltaLayer { @@ -327,10 +330,13 @@ impl Layer for DeltaLayer { } } - fn delete(&self) -> Result<()> { - // delete underlying file - fs::remove_file(self.path())?; - Ok(()) + fn drop_notify(&self) -> DropNotify { + let mut inner = self.inner.write().unwrap(); + + inner + .drop_watch + .get_or_insert_with(|| DropNotify::new()) + .clone() } fn is_incremental(&self) -> bool { @@ -551,6 +557,7 @@ impl DeltaLayer { file: None, index_start_blk: 0, index_root_blk: 0, + drop_watch: None, }), } } @@ -578,6 +585,7 @@ impl DeltaLayer { file: None, index_start_blk: 0, index_root_blk: 0, + drop_watch: None, }), }) } @@ -743,6 +751,7 @@ impl DeltaLayerWriterInner { file: None, index_start_blk, index_root_blk, + drop_watch: None, }), }; diff --git a/pageserver/src/tenant/image_layer.rs b/pageserver/src/tenant/image_layer.rs index 8409d34bc9..fb7a0f77ae 100644 --- a/pageserver/src/tenant/image_layer.rs +++ b/pageserver/src/tenant/image_layer.rs @@ -26,7 +26,9 @@ use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter}; use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader}; use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection}; use crate::tenant::filename::{ImageFileName, PathOrConf}; -use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState}; +use crate::tenant::storage_layer::{ + DropNotify, Layer, ValueReconstructResult, ValueReconstructState, +}; use crate::virtual_file::VirtualFile; use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX}; use anyhow::{bail, ensure, Context, Result}; @@ -34,7 +36,6 @@ use bytes::Bytes; use hex; use rand::{distributions::Alphanumeric, Rng}; use serde::{Deserialize, Serialize}; -use std::fs; use std::io::Write; use std::io::{Seek, SeekFrom}; use std::ops::Range; @@ -117,6 +118,8 @@ pub struct ImageLayerInner { /// Reader object for reading blocks from the file. (None if not loaded yet) file: Option>, + + drop_watch: Option, } impl Layer for ImageLayer { @@ -184,10 +187,13 @@ impl Layer for ImageLayer { todo!(); } - fn delete(&self) -> Result<()> { - // delete underlying file - fs::remove_file(self.path())?; - Ok(()) + fn drop_notify(&self) -> DropNotify { + let mut inner = self.inner.write().unwrap(); + + inner + .drop_watch + .get_or_insert_with(|| DropNotify::new()) + .clone() } fn is_incremental(&self) -> bool { @@ -351,6 +357,7 @@ impl ImageLayer { file: None, index_start_blk: 0, index_root_blk: 0, + drop_watch: None, }), } } @@ -378,6 +385,7 @@ impl ImageLayer { loaded: false, index_start_blk: 0, index_root_blk: 0, + drop_watch: None, }), }) } @@ -532,6 +540,7 @@ impl ImageLayerWriterInner { file: None, index_start_blk, index_root_blk, + drop_watch: None, }), }; diff --git a/pageserver/src/tenant/inmemory_layer.rs b/pageserver/src/tenant/inmemory_layer.rs index 9aa33a72ca..3fcc660234 100644 --- a/pageserver/src/tenant/inmemory_layer.rs +++ b/pageserver/src/tenant/inmemory_layer.rs @@ -10,9 +10,11 @@ use crate::tenant::blob_io::{BlobCursor, BlobWriter}; use crate::tenant::block_io::BlockReader; use crate::tenant::delta_layer::{DeltaLayer, DeltaLayerWriter}; use crate::tenant::ephemeral_file::EphemeralFile; -use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState}; +use crate::tenant::storage_layer::{ + DropNotify, Layer, ValueReconstructResult, ValueReconstructState, +}; use crate::walrecord; -use anyhow::{bail, ensure, Result}; +use anyhow::{ensure, Result}; use std::cell::RefCell; use std::collections::HashMap; use tracing::*; @@ -172,8 +174,8 @@ impl Layer for InMemoryLayer { /// Nothing to do here. When you drop the last reference to the layer, it will /// be deallocated. - fn delete(&self) -> Result<()> { - bail!("can't delete an InMemoryLayer") + fn drop_notify(&self) -> DropNotify { + panic!("can't delete an InMemoryLayer") } fn is_incremental(&self) -> bool { diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 8dafcab124..7c3527be2b 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -145,9 +145,31 @@ pub trait Layer: Send + Sync { panic!("Not implemented") } - /// Permanently remove this layer from disk. - fn delete(&self) -> Result<()>; + fn drop_notify(&self) -> DropNotify; /// Dump summary of the contents of the layer to stdout fn dump(&self, verbose: bool) -> Result<()>; } + +#[derive(Clone)] +pub struct DropNotify(std::sync::Arc); + +impl DropNotify { + pub fn new() -> Self { + DropNotify(std::sync::Arc::new(tokio::sync::Notify::new())) + } + + pub async fn dropped(&self) { + self.0.notified().await + } + + pub fn notify_waiters(&self) { + self.0.notify_waiters(); + } +} + +impl Drop for DropNotify { + fn drop(&mut self) { + self.0.notify_waiters(); + } +} diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index b5c62324a7..194741148f 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -144,12 +144,6 @@ pub struct Timeline { /// to be notified when layer flushing has finished, subscribe to the layer_flush_done channel layer_flush_done_tx: tokio::sync::watch::Sender<(u64, anyhow::Result<()>)>, - /// Layer removal lock. - /// A lock to ensure that no layer of the timeline is removed concurrently by other tasks. - /// This lock is acquired in [`Timeline::gc`], [`Timeline::compact`], - /// and [`Tenant::delete_timeline`]. - layer_removal_cs: Mutex<()>, - // Needed to ensure that we can't create a branch at a point that was already garbage collected pub latest_gc_cutoff_lsn: Rcu, @@ -504,12 +498,12 @@ impl Timeline { CheckpointConfig::Forced => { self.freeze_inmem_layer(false); self.flush_frozen_layers_and_wait().await?; - self.compact() + self.compact().await } } } - pub fn compact(&self) -> anyhow::Result<()> { + pub async fn compact(&self) -> anyhow::Result<()> { let last_record_lsn = self.get_last_record_lsn(); // Last record Lsn could be zero in case the timelie was just created @@ -552,8 +546,6 @@ impl Timeline { // Below are functions compact_level0() and create_image_layers() // but they are a bit ad hoc and don't quite work like it's explained // above. Rewrite it. - let _layer_removal_cs = self.layer_removal_cs.lock().unwrap(); - let target_file_size = self.get_checkpoint_distance(); // Define partitioning schema if needed @@ -574,7 +566,7 @@ impl Timeline { // 3. Compact let timer = self.metrics.compact_time_histo.start_timer(); - self.compact_level0(target_file_size)?; + self.compact_level0(target_file_size).await?; timer.stop_and_record(); } Err(err) => { @@ -776,7 +768,6 @@ impl Timeline { layer_flush_done_tx, write_lock: Mutex::new(()), - layer_removal_cs: Mutex::new(()), gc_info: RwLock::new(GcInfo { retain_lsns: Vec::new(), @@ -1208,12 +1199,6 @@ impl Timeline { Ok(()) } - pub(super) fn layer_removal_guard(&self) -> anyhow::Result> { - self.layer_removal_cs - .try_lock() - .map_err(|e| anyhow!("cannot lock compaction critical section {e}")) - } - fn try_spawn_size_init_task(self: &Arc, init_lsn: Lsn) { // Atomically check if the timeline size calculation had already started. // If the flag was not already set, this sets it. @@ -1981,291 +1966,311 @@ impl Timeline { /// Collect a bunch of Level 0 layer files, and compact and reshuffle them as /// as Level 1 files. /// - fn compact_level0(&self, target_file_size: u64) -> anyhow::Result<()> { - let layers = self.layers.read().unwrap(); - let mut level0_deltas = layers.get_level0_deltas()?; - drop(layers); + async fn compact_level0(&self, target_file_size: u64) -> anyhow::Result<()> { + let mut deltas_to_compact; + let mut new_layers; - // Only compact if enough layers have accumulated. - if level0_deltas.is_empty() || level0_deltas.len() < self.get_compaction_threshold() { - return Ok(()); - } + { + let mut level0_deltas = { + let layers = self.layers.read().unwrap(); + layers.get_level0_deltas()? + }; - // Gather the files to compact in this iteration. - // - // Start with the oldest Level 0 delta file, and collect any other - // level 0 files that form a contiguous sequence, such that the end - // LSN of previous file matches the start LSN of the next file. - // - // Note that if the files don't form such a sequence, we might - // "compact" just a single file. That's a bit pointless, but it allows - // us to get rid of the level 0 file, and compact the other files on - // the next iteration. This could probably made smarter, but such - // "gaps" in the sequence of level 0 files should only happen in case - // of a crash, partial download from cloud storage, or something like - // that, so it's not a big deal in practice. - level0_deltas.sort_by_key(|l| l.get_lsn_range().start); - let mut level0_deltas_iter = level0_deltas.iter(); - - let first_level0_delta = level0_deltas_iter.next().unwrap(); - let mut prev_lsn_end = first_level0_delta.get_lsn_range().end; - let mut deltas_to_compact = vec![Arc::clone(first_level0_delta)]; - for l in level0_deltas_iter { - let lsn_range = l.get_lsn_range(); - - if lsn_range.start != prev_lsn_end { - break; + // Only compact if enough layers have accumulated. + if level0_deltas.is_empty() || level0_deltas.len() < self.get_compaction_threshold() { + return Ok(()); } - deltas_to_compact.push(Arc::clone(l)); - prev_lsn_end = lsn_range.end; - } - let lsn_range = Range { - start: deltas_to_compact.first().unwrap().get_lsn_range().start, - end: deltas_to_compact.last().unwrap().get_lsn_range().end, - }; - info!( - "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)", - lsn_range.start, - lsn_range.end, - deltas_to_compact.len(), - level0_deltas.len() - ); - for l in deltas_to_compact.iter() { - info!("compact includes {}", l.filename().display()); - } - // We don't need the original list of layers anymore. Drop it so that - // we don't accidentally use it later in the function. - drop(level0_deltas); + // Gather the files to compact in this iteration. + // + // Start with the oldest Level 0 delta file, and collect any other + // level 0 files that form a contiguous sequence, such that the end + // LSN of previous file matches the start LSN of the next file. + // + // Note that if the files don't form such a sequence, we might + // "compact" just a single file. That's a bit pointless, but it allows + // us to get rid of the level 0 file, and compact the other files on + // the next iteration. This could probably made smarter, but such + // "gaps" in the sequence of level 0 files should only happen in case + // of a crash, partial download from cloud storage, or something like + // that, so it's not a big deal in practice. + level0_deltas.sort_by_key(|l| l.get_lsn_range().start); + let mut level0_deltas_iter = level0_deltas.iter(); - // This iterator walks through all key-value pairs from all the layers - // we're compacting, in key, LSN order. - let all_values_iter = deltas_to_compact - .iter() - .map(|l| l.iter()) - .kmerge_by(|a, b| { - if let Ok((a_key, a_lsn, _)) = a { - if let Ok((b_key, b_lsn, _)) = b { + let first_level0_delta = level0_deltas_iter.next().unwrap(); + let mut prev_lsn_end = first_level0_delta.get_lsn_range().end; + deltas_to_compact = vec![Arc::clone(first_level0_delta)]; + for l in level0_deltas_iter { + let lsn_range = l.get_lsn_range(); + + if lsn_range.start != prev_lsn_end { + break; + } + deltas_to_compact.push(Arc::clone(l)); + prev_lsn_end = lsn_range.end; + } + let lsn_range = Range { + start: deltas_to_compact.first().unwrap().get_lsn_range().start, + end: deltas_to_compact.last().unwrap().get_lsn_range().end, + }; + + info!( + "Starting Level0 compaction in LSN range {}-{} for {} layers ({} deltas in total)", + lsn_range.start, + lsn_range.end, + deltas_to_compact.len(), + level0_deltas.len() + ); + for l in deltas_to_compact.iter() { + info!("compact includes {}", l.filename().display()); + } + // We don't need the original list of layers anymore. Drop it so that + // we don't accidentally use it later in the function. + drop(level0_deltas); + + // This iterator walks through all key-value pairs from all the layers + // we're compacting, in key, LSN order. + let all_values_iter = deltas_to_compact + .iter() + .map(|l| l.iter()) + .kmerge_by(|a, b| { + if let Ok((a_key, a_lsn, _)) = a { + if let Ok((b_key, b_lsn, _)) = b { + match a_key.cmp(b_key) { + Ordering::Less => true, + Ordering::Equal => a_lsn <= b_lsn, + Ordering::Greater => false, + } + } else { + false + } + } else { + true + } + }); + + // This iterator walks through all keys and is needed to calculate size used by each key + let mut all_keys_iter = + deltas_to_compact + .iter() + .map(|l| l.key_iter()) + .kmerge_by(|a, b| { + let (a_key, a_lsn, _) = a; + let (b_key, b_lsn, _) = b; match a_key.cmp(b_key) { Ordering::Less => true, Ordering::Equal => a_lsn <= b_lsn, Ordering::Greater => false, } - } else { - false + }); + + // Merge the contents of all the input delta layers into a new set + // of delta layers, based on the current partitioning. + // + // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one. + // It's possible that there is a single key with so many page versions that storing all of them in a single layer file + // would be too large. In that case, we also split on the LSN dimension. + // + // LSN + // ^ + // | + // | +-----------+ +--+--+--+--+ + // | | | | | | | | + // | +-----------+ | | | | | + // | | | | | | | | + // | +-----------+ ==> | | | | | + // | | | | | | | | + // | +-----------+ | | | | | + // | | | | | | | | + // | +-----------+ +--+--+--+--+ + // | + // +--------------> key + // + // + // If one key (X) has a lot of page versions: + // + // LSN + // ^ + // | (X) + // | +-----------+ +--+--+--+--+ + // | | | | | | | | + // | +-----------+ | | +--+ | + // | | | | | | | | + // | +-----------+ ==> | | | | | + // | | | | | +--+ | + // | +-----------+ | | | | | + // | | | | | | | | + // | +-----------+ +--+--+--+--+ + // | + // +--------------> key + // TODO: this actually divides the layers into fixed-size chunks, not + // based on the partitioning. + // + // TODO: we should also opportunistically materialize and + // garbage collect what we can. + new_layers = Vec::new(); + let mut prev_key: Option = None; + let mut writer: Option = None; + let mut key_values_total_size = 0u64; + let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key + let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key + for x in all_values_iter { + let (key, lsn, value) = x?; + let same_key = prev_key.map_or(false, |prev_key| prev_key == key); + // We need to check key boundaries once we reach next key or end of layer with the same key + if !same_key || lsn == dup_end_lsn { + let mut next_key_size = 0u64; + let is_dup_layer = dup_end_lsn.is_valid(); + dup_start_lsn = Lsn::INVALID; + if !same_key { + dup_end_lsn = Lsn::INVALID; } - } else { - true - } - }); - - // This iterator walks through all keys and is needed to calculate size used by each key - let mut all_keys_iter = deltas_to_compact - .iter() - .map(|l| l.key_iter()) - .kmerge_by(|a, b| { - let (a_key, a_lsn, _) = a; - let (b_key, b_lsn, _) = b; - match a_key.cmp(b_key) { - Ordering::Less => true, - Ordering::Equal => a_lsn <= b_lsn, - Ordering::Greater => false, - } - }); - - // Merge the contents of all the input delta layers into a new set - // of delta layers, based on the current partitioning. - // - // We split the new delta layers on the key dimension. We iterate through the key space, and for each key, check if including the next key to the current output layer we're building would cause the layer to become too large. If so, dump the current output layer and start new one. - // It's possible that there is a single key with so many page versions that storing all of them in a single layer file - // would be too large. In that case, we also split on the LSN dimension. - // - // LSN - // ^ - // | - // | +-----------+ +--+--+--+--+ - // | | | | | | | | - // | +-----------+ | | | | | - // | | | | | | | | - // | +-----------+ ==> | | | | | - // | | | | | | | | - // | +-----------+ | | | | | - // | | | | | | | | - // | +-----------+ +--+--+--+--+ - // | - // +--------------> key - // - // - // If one key (X) has a lot of page versions: - // - // LSN - // ^ - // | (X) - // | +-----------+ +--+--+--+--+ - // | | | | | | | | - // | +-----------+ | | +--+ | - // | | | | | | | | - // | +-----------+ ==> | | | | | - // | | | | | +--+ | - // | +-----------+ | | | | | - // | | | | | | | | - // | +-----------+ +--+--+--+--+ - // | - // +--------------> key - // TODO: this actually divides the layers into fixed-size chunks, not - // based on the partitioning. - // - // TODO: we should also opportunistically materialize and - // garbage collect what we can. - let mut new_layers = Vec::new(); - let mut prev_key: Option = None; - let mut writer: Option = None; - let mut key_values_total_size = 0u64; - let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key - let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key - for x in all_values_iter { - let (key, lsn, value) = x?; - let same_key = prev_key.map_or(false, |prev_key| prev_key == key); - // We need to check key boundaries once we reach next key or end of layer with the same key - if !same_key || lsn == dup_end_lsn { - let mut next_key_size = 0u64; - let is_dup_layer = dup_end_lsn.is_valid(); - dup_start_lsn = Lsn::INVALID; - if !same_key { - dup_end_lsn = Lsn::INVALID; - } - // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size - for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() { - next_key_size = next_size; - if key != next_key { - if dup_end_lsn.is_valid() { - // We are writting segment with duplicates: - // place all remaining values of this key in separate segment - dup_start_lsn = dup_end_lsn; // new segments starts where old stops - dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range + // Determine size occupied by this key. We stop at next key or when size becomes larger than target_file_size + for (next_key, next_lsn, next_size) in all_keys_iter.by_ref() { + next_key_size = next_size; + if key != next_key { + if dup_end_lsn.is_valid() { + // We are writting segment with duplicates: + // place all remaining values of this key in separate segment + dup_start_lsn = dup_end_lsn; // new segments starts where old stops + dup_end_lsn = lsn_range.end; // there are no more values of this key till end of LSN range + } + break; + } + key_values_total_size += next_size; + // Check if it is time to split segment: if total keys size is larger than target file size. + // We need to avoid generation of empty segments if next_size > target_file_size. + if key_values_total_size > target_file_size && lsn != next_lsn { + // Split key between multiple layers: such layer can contain only single key + dup_start_lsn = if dup_end_lsn.is_valid() { + dup_end_lsn // new segment with duplicates starts where old one stops + } else { + lsn // start with the first LSN for this key + }; + dup_end_lsn = next_lsn; // upper LSN boundary is exclusive + break; } - break; } - key_values_total_size += next_size; - // Check if it is time to split segment: if total keys size is larger than target file size. - // We need to avoid generation of empty segments if next_size > target_file_size. - if key_values_total_size > target_file_size && lsn != next_lsn { - // Split key between multiple layers: such layer can contain only single key - dup_start_lsn = if dup_end_lsn.is_valid() { - dup_end_lsn // new segment with duplicates starts where old one stops + // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set. + if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() { + dup_start_lsn = dup_end_lsn; + dup_end_lsn = lsn_range.end; + } + if writer.is_some() { + let written_size = writer.as_mut().unwrap().size(); + // check if key cause layer overflow... + if is_dup_layer + || dup_end_lsn.is_valid() + || written_size + key_values_total_size > target_file_size + { + // ... if so, flush previous layer and prepare to write new one + new_layers + .push(writer.take().unwrap().finish(prev_key.unwrap().next())?); + writer = None; + } + } + // Remember size of key value because at next iteration we will access next item + key_values_total_size = next_key_size; + } + if writer.is_none() { + // Create writer if not initiaized yet + writer = Some(DeltaLayerWriter::new( + self.conf, + self.timeline_id, + self.tenant_id, + key, + if dup_end_lsn.is_valid() { + // this is a layer containing slice of values of the same key + debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn); + dup_start_lsn..dup_end_lsn } else { - lsn // start with the first LSN for this key - }; - dup_end_lsn = next_lsn; // upper LSN boundary is exclusive - break; - } + debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end); + lsn_range.clone() + }, + )?); } - // handle case when loop reaches last key: in this case dup_end is non-zero but dup_start is not set. - if dup_end_lsn.is_valid() && !dup_start_lsn.is_valid() { - dup_start_lsn = dup_end_lsn; - dup_end_lsn = lsn_range.end; + + fail_point!("delta-layer-writer-fail-before-finish", |_| { + anyhow::bail!("failpoint delta-layer-writer-fail-before-finish"); + }); + + writer.as_mut().unwrap().put_value(key, lsn, value)?; + prev_key = Some(key); + } + if let Some(writer) = writer { + new_layers.push(writer.finish(prev_key.unwrap().next())?); + } + + // Sync layers + if !new_layers.is_empty() { + let mut layer_paths: Vec = new_layers.iter().map(|l| l.path()).collect(); + + // also sync the directory + layer_paths.push(self.conf.timeline_path(&self.timeline_id, &self.tenant_id)); + + // Fsync all the layer files and directory using multiple threads to + // minimize latency. + par_fsync::par_fsync(&layer_paths)?; + + layer_paths.pop().unwrap(); + } + } + + let files_to_delete = { + let mut layers = self.layers.write().unwrap(); + let mut new_layer_paths = HashMap::with_capacity(new_layers.len()); + for l in new_layers { + let new_delta_path = l.path(); + + let metadata = new_delta_path.metadata()?; + + if let Some(remote_client) = &self.remote_client { + remote_client.schedule_layer_file_upload( + &new_delta_path, + &LayerFileMetadata::new(metadata.len()), + )?; } - if writer.is_some() { - let written_size = writer.as_mut().unwrap().size(); - // check if key cause layer overflow... - if is_dup_layer - || dup_end_lsn.is_valid() - || written_size + key_values_total_size > target_file_size - { - // ... if so, flush previous layer and prepare to write new one - new_layers.push(writer.take().unwrap().finish(prev_key.unwrap().next())?); - writer = None; - } + + // update the timeline's physical size + self.metrics.current_physical_size_gauge.add(metadata.len()); + + new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len())); + layers.insert_historic(Arc::new(l)); + } + + // Now that we have reshuffled the data to set of new delta layers, we can + // delete the old ones XXX + let mut files_to_delete = Vec::with_capacity(deltas_to_compact.len()); + for l in deltas_to_compact.into_iter() { + if let Some(path) = l.local_path() { + files_to_delete.push((l.drop_notify(), path)); } - // Remember size of key value because at next iteration we will access next item - key_values_total_size = next_key_size; - } - if writer.is_none() { - // Create writer if not initiaized yet - writer = Some(DeltaLayerWriter::new( - self.conf, - self.timeline_id, - self.tenant_id, - key, - if dup_end_lsn.is_valid() { - // this is a layer containing slice of values of the same key - debug!("Create new dup layer {}..{}", dup_start_lsn, dup_end_lsn); - dup_start_lsn..dup_end_lsn - } else { - debug!("Create new layer {}..{}", lsn_range.start, lsn_range.end); - lsn_range.clone() - }, - )?); + layers.remove_historic(l); } + drop(layers); - fail_point!("delta-layer-writer-fail-before-finish", |_| { - anyhow::bail!("failpoint delta-layer-writer-fail-before-finish"); - }); + files_to_delete + }; - writer.as_mut().unwrap().put_value(key, lsn, value)?; - prev_key = Some(key); + // Perform the deletions + for (drop_notify, path) in files_to_delete.iter() { + drop_notify.dropped().await; + self.metrics + .current_physical_size_gauge + .sub(path.metadata()?.len()); + fs::remove_file(path)?; } - if let Some(writer) = writer { - new_layers.push(writer.finish(prev_key.unwrap().next())?); - } - - // Sync layers - if !new_layers.is_empty() { - let mut layer_paths: Vec = new_layers.iter().map(|l| l.path()).collect(); - - // also sync the directory - layer_paths.push(self.conf.timeline_path(&self.timeline_id, &self.tenant_id)); - - // Fsync all the layer files and directory using multiple threads to - // minimize latency. - par_fsync::par_fsync(&layer_paths)?; - - layer_paths.pop().unwrap(); - } - - let mut layers = self.layers.write().unwrap(); - let mut new_layer_paths = HashMap::with_capacity(new_layers.len()); - for l in new_layers { - let new_delta_path = l.path(); - - let metadata = new_delta_path.metadata()?; - - if let Some(remote_client) = &self.remote_client { - remote_client.schedule_layer_file_upload( - &new_delta_path, - &LayerFileMetadata::new(metadata.len()), - )?; - } - - // update the timeline's physical size - self.metrics.current_physical_size_gauge.add(metadata.len()); - - new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len())); - layers.insert_historic(Arc::new(l)); - } - - // Now that we have reshuffled the data to set of new delta layers, we can - // delete the old ones - let mut layer_paths_to_delete = Vec::with_capacity(deltas_to_compact.len()); - drop(all_keys_iter); - for l in deltas_to_compact { - if let Some(path) = l.local_path() { - self.metrics - .current_physical_size_gauge - .sub(path.metadata()?.len()); - layer_paths_to_delete.push(path); - } - l.delete()?; - layers.remove_historic(l); - } - drop(layers); // Also schedule the deletions in remote storage if let Some(remote_client) = &self.remote_client { // FIXME: This also uploads new index file. If // flush_frozen_layer() is doing this at the same time, do // we have a problem? - remote_client.schedule_layer_file_deletion(&layer_paths_to_delete)?; + let paths_to_delete = files_to_delete + .into_iter() + .map(|(_, path)| path) + .collect::>(); + remote_client.schedule_layer_file_deletion(&paths_to_delete)?; } Ok(()) @@ -2357,25 +2362,29 @@ impl Timeline { /// within a layer file. We can only remove the whole file if it's fully /// obsolete. /// - pub(super) fn gc(&self) -> anyhow::Result { + pub(super) async fn gc(&self) -> anyhow::Result { let mut result: GcResult = GcResult::default(); let now = SystemTime::now(); fail_point!("before-timeline-gc"); - let _layer_removal_cs = self.layer_removal_cs.lock().unwrap(); + let horizon_cutoff; + let pitr_cutoff; + let retain_lsns; + { + let gc_info = self.gc_info.read().unwrap(); - let gc_info = self.gc_info.read().unwrap(); - - let horizon_cutoff = min(gc_info.horizon_cutoff, self.get_disk_consistent_lsn()); - let pitr_cutoff = gc_info.pitr_cutoff; - let retain_lsns = &gc_info.retain_lsns; + horizon_cutoff = min(gc_info.horizon_cutoff, self.get_disk_consistent_lsn()); + pitr_cutoff = gc_info.pitr_cutoff; + retain_lsns = gc_info.retain_lsns.clone(); + } let new_gc_cutoff = Lsn::min(horizon_cutoff, pitr_cutoff); - let _enter = - info_span!("gc_timeline", timeline = %self.timeline_id, cutoff = %new_gc_cutoff) - .entered(); + // FIXME + //let _enter = + // info_span!("gc_timeline", timeline = %self.timeline_id, cutoff = %new_gc_cutoff) + // .entered(); // Nothing to GC. Return early. let latest_gc_cutoff = *self.get_latest_gc_cutoff_lsn(); @@ -2419,114 +2428,126 @@ impl Timeline { // 3. it doesn't need to be retained for 'retain_lsns'; // 4. newer on-disk image layers cover the layer's whole key range // - let mut layers = self.layers.write().unwrap(); - 'outer: for l in layers.iter_historic_layers() { - // This layer is in the process of being flushed to disk. - // It will be swapped out of the layer map, replaced with - // on-disk layers containing the same data. - // We can't GC it, as it's not on disk. We can't remove it - // from the layer map yet, as it would make its data - // inaccessible. - if l.is_in_memory() { - continue; - } + let mut files_to_delete; + { + let mut layers = self.layers.write().unwrap(); + 'outer: for l in layers.iter_historic_layers() { + // This layer is in the process of being flushed to disk. + // It will be swapped out of the layer map, replaced with + // on-disk layers containing the same data. + // We can't GC it, as it's not on disk. We can't remove it + // from the layer map yet, as it would make its data + // inaccessible. + if l.is_in_memory() { + continue; + } - result.layers_total += 1; + result.layers_total += 1; - // 1. Is it newer than GC horizon cutoff point? - if l.get_lsn_range().end > horizon_cutoff { - debug!( - "keeping {} because it's newer than horizon_cutoff {}", - l.filename().display(), - horizon_cutoff - ); - result.layers_needed_by_cutoff += 1; - continue 'outer; - } - - // 2. It is newer than PiTR cutoff point? - if l.get_lsn_range().end > pitr_cutoff { - debug!( - "keeping {} because it's newer than pitr_cutoff {}", - l.filename().display(), - pitr_cutoff - ); - result.layers_needed_by_pitr += 1; - continue 'outer; - } - - // 3. Is it needed by a child branch? - // NOTE With that we would keep data that - // might be referenced by child branches forever. - // We can track this in child timeline GC and delete parent layers when - // they are no longer needed. This might be complicated with long inheritance chains. - for retain_lsn in retain_lsns { - // start_lsn is inclusive - if &l.get_lsn_range().start <= retain_lsn { + // 1. Is it newer than GC horizon cutoff point? + if l.get_lsn_range().end > horizon_cutoff { debug!( - "keeping {} because it's still might be referenced by child branch forked at {} is_dropped: xx is_incremental: {}", + "keeping {} because it's newer than horizon_cutoff {}", l.filename().display(), - retain_lsn, - l.is_incremental(), + horizon_cutoff ); - result.layers_needed_by_branches += 1; + result.layers_needed_by_cutoff += 1; continue 'outer; } - } - // 4. Is there a later on-disk layer for this relation? - // - // The end-LSN is exclusive, while disk_consistent_lsn is - // inclusive. For example, if disk_consistent_lsn is 100, it is - // OK for a delta layer to have end LSN 101, but if the end LSN - // is 102, then it might not have been fully flushed to disk - // before crash. - // - // For example, imagine that the following layers exist: - // - // 1000 - image (A) - // 1000-2000 - delta (B) - // 2000 - image (C) - // 2000-3000 - delta (D) - // 3000 - image (E) - // - // If GC horizon is at 2500, we can remove layers A and B, but - // we cannot remove C, even though it's older than 2500, because - // the delta layer 2000-3000 depends on it. - if !layers - .image_layer_exists(&l.get_key_range(), &(l.get_lsn_range().end..new_gc_cutoff))? - { + // 2. It is newer than PiTR cutoff point? + if l.get_lsn_range().end > pitr_cutoff { + debug!( + "keeping {} because it's newer than pitr_cutoff {}", + l.filename().display(), + pitr_cutoff + ); + result.layers_needed_by_pitr += 1; + continue 'outer; + } + + // 3. Is it needed by a child branch? + // NOTE With that we would keep data that + // might be referenced by child branches forever. + // We can track this in child timeline GC and delete parent layers when + // they are no longer needed. This might be complicated with long inheritance chains. + for retain_lsn in retain_lsns.iter() { + // start_lsn is inclusive + if l.get_lsn_range().start <= *retain_lsn { + debug!( + "keeping {} because it's still might be referenced by child branch forked at {} is_dropped: xx is_incremental: {}", + l.filename().display(), + retain_lsn, + l.is_incremental(), + ); + result.layers_needed_by_branches += 1; + continue 'outer; + } + } + + // 4. Is there a later on-disk layer for this relation? + // + // The end-LSN is exclusive, while disk_consistent_lsn is + // inclusive. For example, if disk_consistent_lsn is 100, it is + // OK for a delta layer to have end LSN 101, but if the end LSN + // is 102, then it might not have been fully flushed to disk + // before crash. + // + // For example, imagine that the following layers exist: + // + // 1000 - image (A) + // 1000-2000 - delta (B) + // 2000 - image (C) + // 2000-3000 - delta (D) + // 3000 - image (E) + // + // If GC horizon is at 2500, we can remove layers A and B, but + // we cannot remove C, even though it's older than 2500, because + // the delta layer 2000-3000 depends on it. + if !layers.image_layer_exists( + &l.get_key_range(), + &(l.get_lsn_range().end..new_gc_cutoff), + )? { + debug!( + "keeping {} because it is the latest layer", + l.filename().display() + ); + result.layers_not_updated += 1; + continue 'outer; + } + + // We didn't find any reason to keep this file, so remove it. debug!( - "keeping {} because it is the latest layer", - l.filename().display() + "garbage collecting {} is_dropped: xx is_incremental: {}", + l.filename().display(), + l.is_incremental(), ); - result.layers_not_updated += 1; - continue 'outer; + layers_to_remove.push(Arc::clone(&l)); } - // We didn't find any reason to keep this file, so remove it. - debug!( - "garbage collecting {} is_dropped: xx is_incremental: {}", - l.filename().display(), - l.is_incremental(), - ); - layers_to_remove.push(Arc::clone(&l)); + // Actually delete the layers from disk and remove them from the map. + // (couldn't do this in the loop above, because you cannot modify a collection + // while iterating it. BTreeMap::retain() would be another option) + files_to_delete = Vec::with_capacity(layers_to_remove.len()); + for doomed_layer in layers_to_remove.into_iter() { + if let Some(path) = doomed_layer.local_path() { + self.metrics + .current_physical_size_gauge + .sub(path.metadata()?.len()); + files_to_delete.push((doomed_layer.drop_notify(), path)); + } + layers.remove_historic(doomed_layer); + result.layers_removed += 1; + } } - // Actually delete the layers from disk and remove them from the map. - // (couldn't do this in the loop above, because you cannot modify a collection - // while iterating it. BTreeMap::retain() would be another option) - let mut layer_paths_to_delete = Vec::with_capacity(layers_to_remove.len()); - for doomed_layer in layers_to_remove { - if let Some(path) = doomed_layer.local_path() { - self.metrics - .current_physical_size_gauge - .sub(path.metadata()?.len()); - layer_paths_to_delete.push(path); - } - doomed_layer.delete()?; - layers.remove_historic(doomed_layer); - result.layers_removed += 1; + // Perform the deletions + for (drop_notify, path) in files_to_delete.iter() { + drop_notify.dropped().await; + self.metrics + .current_physical_size_gauge + .sub(path.metadata()?.len()); + fs::remove_file(path)?; } info!( @@ -2539,7 +2560,11 @@ impl Timeline { } if let Some(remote_client) = &self.remote_client { - remote_client.schedule_layer_file_deletion(&layer_paths_to_delete)?; + let paths_to_delete = files_to_delete + .into_iter() + .map(|(_, path)| path) + .collect::>(); + remote_client.schedule_layer_file_deletion(&paths_to_delete)?; } result.elapsed = now.elapsed()?; diff --git a/pageserver/src/tenant_tasks.rs b/pageserver/src/tenant_tasks.rs index 28cfb6925a..cef3e50690 100644 --- a/pageserver/src/tenant_tasks.rs +++ b/pageserver/src/tenant_tasks.rs @@ -84,7 +84,7 @@ async fn compaction_loop(tenant_id: TenantId) { // Run compaction let mut sleep_duration = tenant.get_compaction_period(); - if let Err(e) = tenant.compaction_iteration() { + if let Err(e) = tenant.compaction_iteration().await { sleep_duration = wait_duration; error!("Compaction failed, retrying in {:?}: {e:?}", sleep_duration); }