diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 992e7cf22f..287c04f3e0 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -71,7 +71,7 @@ static TIMEOUT: Duration = Duration::from_secs(60); // FIXME: This current value is very low. I would imagine something like 1 GB or 10 GB // would be more appropriate. But a low value forces the code to be exercised more, // which is good for now to trigger bugs. -static OLDEST_INMEM_DISTANCE: u64 = 16 * 1024 * 1024; +static OLDEST_INMEM_DISTANCE: i128 = 16 * 1024 * 1024; // Metrics collected on operations on the storage repository. lazy_static! { @@ -1074,6 +1074,18 @@ impl LayeredTimeline { /// NOTE: This has nothing to do with checkpoint in PostgreSQL. We don't /// know anything about them here in the repository. fn checkpoint_internal(&self, force: bool) -> Result<()> { + // Grab lock on the layer map. + // + // TODO: We hold it locked throughout the checkpoint operation. That's bad, + // the checkpointing could take many seconds, and any incoming get_page_at_lsn() + // requests will block. + let mut layers = self.layers.lock().unwrap(); + + // Bump the generation number in the layer map, so that we can distinguish + // entries inserted after the checkpoint started + let current_generation = layers.increment_generation(); + + // Read 'last_record_lsn'. That becomes the cutoff LSN for frozen layers. let RecordLsn { last: last_record_lsn, prev: prev_record_lsn, @@ -1085,13 +1097,6 @@ impl LayeredTimeline { last_record_lsn ); - // Grab lock on the layer map. - // - // TODO: We hold it locked throughout the checkpoint operation. That's bad, - // the checkpointing could take many seconds, and any incoming get_page_at_lsn() - // requests will block. - let mut layers = self.layers.lock().unwrap(); - // Take the in-memory layer with the oldest WAL record. If it's older // than the threshold, write it out to disk as a new image and delta file. // Repeat until all remaining in-memory layers are within the threshold. @@ -1102,14 +1107,26 @@ impl LayeredTimeline { // check, though. We should also aim at flushing layers that consume // a lot of memory and/or aren't receiving much updates anymore. let mut disk_consistent_lsn = last_record_lsn; - while let Some(oldest_layer) = layers.peek_oldest_open() { - // Does this layer need freezing? + + while let Some((oldest_layer, oldest_generation)) = layers.peek_oldest_open() { let oldest_pending_lsn = oldest_layer.get_oldest_pending_lsn(); - let distance = last_record_lsn.0 - oldest_pending_lsn.0; - if !force && distance < OLDEST_INMEM_DISTANCE { + + // Does this layer need freezing? + // + // Write out all in-memory layers that contain WAL older than OLDEST_INMEM_DISTANCE. + // Or if 'force' is true, write out all of them. If we reach a layer with the same + // generation number, we know that we have cycled through all layers that were open + // when we started. We don't want to process layers inserted after we started, to + // avoid getting into an infinite loop trying to process again entries that we + // inserted ourselves. + let distance = last_record_lsn.widening_sub(oldest_pending_lsn); + if distance < 0 + || (!force && distance < OLDEST_INMEM_DISTANCE) + || oldest_generation == current_generation + { info!( "the oldest layer is now {} which is {} bytes behind last_record_lsn", - oldest_layer.get_seg_tag(), + oldest_layer.filename().display(), distance ); disk_consistent_lsn = oldest_pending_lsn; diff --git a/pageserver/src/layered_repository/layer_map.rs b/pageserver/src/layered_repository/layer_map.rs index 4c837b423c..7a859a1551 100644 --- a/pageserver/src/layered_repository/layer_map.rs +++ b/pageserver/src/layered_repository/layer_map.rs @@ -43,6 +43,10 @@ pub struct LayerMap { /// This allows easy access to the in-memory layer that contains the /// oldest WAL record. open_segs: BinaryHeap, + + /// Generation number, used to distinguish newly inserted entries in the + /// binary heap from older entries during checkpoint. + current_generation: u64, } /// @@ -59,9 +63,13 @@ struct SegEntry { /// Entry held LayerMap.open_segs, with boilerplate comparison /// routines to implement a min-heap ordered by 'oldest_pending_lsn' +/// +/// Each entry also carries a generation number. It can be used to distinguish +/// entries with the same 'oldest_pending_lsn'. struct OpenSegEntry { pub oldest_pending_lsn: Lsn, pub layer: Arc, + pub generation: u64, } impl Ord for OpenSegEntry { fn cmp(&self, other: &Self) -> Ordering { @@ -73,10 +81,13 @@ impl Ord for OpenSegEntry { impl PartialOrd for OpenSegEntry { fn partial_cmp(&self, other: &Self) -> Option { // BinaryHeap is a max-heap, and we want a min-heap. Reverse the ordering here - // to get that. - other - .oldest_pending_lsn - .partial_cmp(&self.oldest_pending_lsn) + // to get that. Entries with identical oldest_pending_lsn are ordered by generation + Some( + other + .oldest_pending_lsn + .cmp(&self.oldest_pending_lsn) + .then_with(|| other.generation.cmp(&self.generation)), + ) } } impl PartialEq for OpenSegEntry { @@ -151,6 +162,7 @@ impl LayerMap { let opensegentry = OpenSegEntry { oldest_pending_lsn: layer.get_oldest_pending_lsn(), layer: layer, + generation: self.current_generation, }; self.open_segs.push(opensegentry); @@ -290,15 +302,23 @@ impl LayerMap { false } - /// Return the oldest in-memory layer. - pub fn peek_oldest_open(&self) -> Option> { + /// Return the oldest in-memory layer, along with its generation number. + pub fn peek_oldest_open(&self) -> Option<(Arc, u64)> { if let Some(opensegentry) = self.open_segs.peek() { - Some(Arc::clone(&opensegentry.layer)) + Some((Arc::clone(&opensegentry.layer), opensegentry.generation)) } else { None } } + /// Increment the generation number used to stamp open in-memory layers. Layers + /// added with `insert_open` after this call will be associated with the new + /// generation. Returns the new generation number. + pub fn increment_generation(&mut self) -> u64 { + self.current_generation += 1; + self.current_generation + } + pub fn iter_historic_layers(&self) -> HistoricLayerIter { HistoricLayerIter { segiter: self.segs.iter(), @@ -312,6 +332,7 @@ impl Default for LayerMap { LayerMap { segs: HashMap::new(), open_segs: BinaryHeap::new(), + current_generation: 0, } } } diff --git a/zenith_utils/src/lsn.rs b/zenith_utils/src/lsn.rs index 0f865964a4..1afd8fb962 100644 --- a/zenith_utils/src/lsn.rs +++ b/zenith_utils/src/lsn.rs @@ -32,6 +32,12 @@ impl Lsn { self.0.checked_sub(other).map(Lsn) } + /// Subtract a number, returning the difference as i128 to avoid overflow. + pub fn widening_sub>(self, other: T) -> i128 { + let other: u64 = other.into(); + i128::from(self.0) - i128::from(other) + } + /// Parse an LSN from a filename in the form `0000000000000000` pub fn from_filename(filename: F) -> Result where @@ -264,6 +270,11 @@ mod tests { assert_eq!(Lsn(1234).checked_sub(1233u64), Some(Lsn(1))); assert_eq!(Lsn(1234).checked_sub(1235u64), None); + assert_eq!(Lsn(1235).widening_sub(1234u64), 1); + assert_eq!(Lsn(1234).widening_sub(1235u64), -1); + assert_eq!(Lsn(u64::MAX).widening_sub(0u64), i128::from(u64::MAX)); + assert_eq!(Lsn(0).widening_sub(u64::MAX), -i128::from(u64::MAX)); + let seg_sz: usize = 16 * 1024 * 1024; assert_eq!(Lsn(0x1000007).segment_offset(seg_sz), 7); assert_eq!(Lsn(0x1000007).segment_number(seg_sz), 1u64);