diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 3ef9d81a49..47813da404 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -71,7 +71,6 @@ mod storage_layer; use delta_layer::DeltaLayer; use ephemeral_file::is_ephemeral_file; use filename::{DeltaFileName, ImageFileName}; -use global_layer_map::{LayerId, GLOBAL_LAYER_MAP}; use image_layer::ImageLayer; use inmemory_layer::InMemoryLayer; use layer_map::LayerMap; @@ -1161,8 +1160,8 @@ impl LayeredTimeline { // create an ImageLayer struct for each image file. if imgfilename.lsn > disk_consistent_lsn { warn!( - "found future image layer {} on timeline {}", - imgfilename, self.timelineid + "found future image layer {} on timeline {} disk_consistent_lsn is {}", + imgfilename, self.timelineid, disk_consistent_lsn ); rename_to_backup(direntry.path())?; @@ -1185,8 +1184,8 @@ impl LayeredTimeline { // before crash. if deltafilename.end_lsn > disk_consistent_lsn + 1 { warn!( - "found future delta layer {} on timeline {}", - deltafilename, self.timelineid + "found future delta layer {} on timeline {} disk_consistent_lsn is {}", + deltafilename, self.timelineid, disk_consistent_lsn ); rename_to_backup(direntry.path())?; @@ -1382,7 +1381,7 @@ impl LayeredTimeline { self.tenantid, seg, lsn, - lsn, + last_record_lsn, )?; } else { return Ok(open_layer); @@ -1425,7 +1424,7 @@ impl LayeredTimeline { self.timelineid, self.tenantid, start_lsn, - lsn, + last_record_lsn, )?; } else { // New relation. @@ -1436,8 +1435,14 @@ impl LayeredTimeline { lsn ); - layer = - InMemoryLayer::create(self.conf, self.timelineid, self.tenantid, seg, lsn, lsn)?; + layer = InMemoryLayer::create( + self.conf, + self.timelineid, + self.tenantid, + seg, + lsn, + last_record_lsn, + )?; } let layer_rc: Arc = Arc::new(layer); @@ -1454,7 +1459,7 @@ impl LayeredTimeline { // Prevent concurrent checkpoints let _checkpoint_cs = self.checkpoint_cs.lock().unwrap(); - let mut write_guard = self.write_lock.lock().unwrap(); + let write_guard = self.write_lock.lock().unwrap(); let mut layers = self.layers.lock().unwrap(); // Bump the generation number in the layer map, so that we can distinguish @@ -1480,11 +1485,17 @@ impl LayeredTimeline { let mut disk_consistent_lsn = last_record_lsn; let mut layer_paths = Vec::new(); + let mut freeze_end_lsn = Lsn(0); + let mut evicted_layers = Vec::new(); + + // + // Determine which layers we need to evict and calculate max(latest_lsn) + // among those layers. + // while let Some((oldest_layer_id, oldest_layer, oldest_generation)) = layers.peek_oldest_open() { - let oldest_pending_lsn = oldest_layer.get_oldest_pending_lsn(); - + let oldest_lsn = oldest_layer.get_oldest_lsn(); // Does this layer need freezing? // // Write out all in-memory layers that contain WAL older than CHECKPOINT_DISTANCE. @@ -1493,28 +1504,60 @@ impl LayeredTimeline { // when we started. We don't want to process layers inserted after we started, to // avoid getting into an infinite loop trying to process again entries that we // inserted ourselves. - let distance = last_record_lsn.widening_sub(oldest_pending_lsn); - if distance < 0 + // + // Once we have decided to write out at least one layer, we must also write out + // any other layers that contain WAL older than the end LSN of the layers we have + // already decided to write out. In other words, we must write out all layers + // whose [oldest_lsn, latest_lsn) range overlaps with any of the other layers + // that we are writing out. Otherwise, when we advance 'disk_consistent_lsn', it's + // ambiguous whether those layers are already durable on disk or not. For example, + // imagine that there are two layers in memory that contain page versions in the + // following LSN ranges: + // + // A: 100-150 + // B: 110-200 + // + // If we flush layer A, we must also flush layer B, because they overlap. If we + // flushed only A, and advanced 'disk_consistent_lsn' to 150, we would break the + // rule that all WAL older than 'disk_consistent_lsn' are durable on disk, because + // B contains some WAL older than 150. On the other hand, if we flushed out A and + // advanced 'disk_consistent_lsn' only up to 110, after crash and restart we would + // delete the first layer because its end LSN is larger than 110. If we changed + // the deletion logic to not delete it, then we would start streaming at 110, and + // process again the WAL records in the range 110-150 that are already in layer A, + // and the WAL processing code does not cope with that. We solve that dilemma by + // insisting that if we write out the first layer, we also write out the second + // layer, and advance disk_consistent_lsn all the way up to 200. + // + let distance = last_record_lsn.widening_sub(oldest_lsn); + if (distance < 0 || distance < checkpoint_distance.into() - || oldest_generation == current_generation + || oldest_generation == current_generation) + && oldest_lsn >= freeze_end_lsn + // this layer intersects with evicted layer and so also need to be evicted { info!( "the oldest layer is now {} which is {} bytes behind last_record_lsn", oldest_layer.filename().display(), distance ); - disk_consistent_lsn = oldest_pending_lsn; + disk_consistent_lsn = oldest_lsn; break; } + let latest_lsn = oldest_layer.get_latest_lsn(); + if latest_lsn > freeze_end_lsn { + freeze_end_lsn = latest_lsn; // calculate max of latest_lsn of the layers we're about to evict + } + layers.remove_open(oldest_layer_id); + evicted_layers.push((oldest_layer_id, oldest_layer)); + } - drop(layers); - drop(write_guard); - - let mut this_layer_paths = self.evict_layer(oldest_layer_id, reconstruct_pages)?; - layer_paths.append(&mut this_layer_paths); - - write_guard = self.write_lock.lock().unwrap(); - layers = self.layers.lock().unwrap(); + // Freeze evicted layers + for (_evicted_layer_id, evicted_layer) in evicted_layers.iter() { + // Mark the layer as no longer accepting writes and record the end_lsn. + // This happens in-place, no new layers are created now. + evicted_layer.freeze(freeze_end_lsn); + layers.insert_historic(evicted_layer.clone()); } // Call unload() on all frozen layers, to release memory. @@ -1527,6 +1570,14 @@ impl LayeredTimeline { drop(layers); drop(write_guard); + // Create delta/image layers for evicted layers + for (_evicted_layer_id, evicted_layer) in evicted_layers.iter() { + let mut this_layer_paths = + self.evict_layer(evicted_layer.clone(), reconstruct_pages)?; + layer_paths.append(&mut this_layer_paths); + } + + // Sync layers if !layer_paths.is_empty() { // We must fsync the timeline dir to ensure the directory entries for // new layer files are durable @@ -1594,52 +1645,29 @@ impl LayeredTimeline { Ok(()) } - fn evict_layer(&self, layer_id: LayerId, reconstruct_pages: bool) -> Result> { - // Mark the layer as no longer accepting writes and record the end_lsn. - // This happens in-place, no new layers are created now. - // We call `get_last_record_lsn` again, which may be different from the - // original load, as we may have released the write lock since then. - - let mut write_guard = self.write_lock.lock().unwrap(); - let mut layers = self.layers.lock().unwrap(); + fn evict_layer( + &self, + layer: Arc, + reconstruct_pages: bool, + ) -> Result> { + let new_historics = layer.write_to_disk(self, reconstruct_pages)?; let mut layer_paths = Vec::new(); + let _write_guard = self.write_lock.lock().unwrap(); + let mut layers = self.layers.lock().unwrap(); - let global_layer_map = GLOBAL_LAYER_MAP.read().unwrap(); - if let Some(oldest_layer) = global_layer_map.get(&layer_id) { - drop(global_layer_map); - oldest_layer.freeze(self.get_last_record_lsn()); + // Finally, replace the frozen in-memory layer with the new on-disk layers + layers.remove_historic(layer); - // The layer is no longer open, update the layer map to reflect this. - // We will replace it with on-disk historics below. - layers.remove_open(layer_id); - layers.insert_historic(oldest_layer.clone()); - - // Write the now-frozen layer to disk. That could take a while, so release the lock while do it - drop(layers); - drop(write_guard); - - let new_historics = oldest_layer.write_to_disk(self, reconstruct_pages)?; - - write_guard = self.write_lock.lock().unwrap(); - layers = self.layers.lock().unwrap(); - - // Finally, replace the frozen in-memory layer with the new on-disk layers - layers.remove_historic(oldest_layer); - - // Add the historics to the LayerMap - for delta_layer in new_historics.delta_layers { - layer_paths.push(delta_layer.path()); - layers.insert_historic(Arc::new(delta_layer)); - } - for image_layer in new_historics.image_layers { - layer_paths.push(image_layer.path()); - layers.insert_historic(Arc::new(image_layer)); - } + // Add the historics to the LayerMap + for delta_layer in new_historics.delta_layers { + layer_paths.push(delta_layer.path()); + layers.insert_historic(Arc::new(delta_layer)); + } + for image_layer in new_historics.image_layers { + layer_paths.push(image_layer.path()); + layers.insert_historic(Arc::new(image_layer)); } - drop(layers); - drop(write_guard); - Ok(layer_paths) } diff --git a/pageserver/src/layered_repository/inmemory_layer.rs b/pageserver/src/layered_repository/inmemory_layer.rs index 6242f0a361..b710ca1c07 100644 --- a/pageserver/src/layered_repository/inmemory_layer.rs +++ b/pageserver/src/layered_repository/inmemory_layer.rs @@ -39,8 +39,20 @@ pub struct InMemoryLayer { /// start_lsn: Lsn, - /// LSN of the oldest page version stored in this layer - oldest_pending_lsn: Lsn, + /// + /// LSN of the oldest page version stored in this layer. + /// + /// This is different from 'start_lsn' in that we enforce that the 'start_lsn' + /// of a layer always matches the 'end_lsn' of its predecessor, even if there + /// are no page versions until at a later LSN. That way you can detect any + /// missing layer files more easily. 'oldest_lsn' is the first page version + /// actually stored in this layer. In the range between 'start_lsn' and + /// 'oldest_lsn', there are no changes to the segment. + /// 'oldest_lsn' is used to adjust 'disk_consistent_lsn' and that is why it should + /// point to the beginning of WAL record. This is the other difference with 'start_lsn' + /// which points to end of WAL record. This is why 'oldest_lsn' can be smaller than 'start_lsn'. + /// + oldest_lsn: Lsn, /// The above fields never change. The parts that do change are in 'inner', /// and protected by mutex. @@ -73,6 +85,14 @@ pub struct InMemoryLayerInner { /// a non-blocky rel, 'seg_sizes' is not used and is always empty. /// seg_sizes: VecMap, + + /// + /// LSN of the newest page version stored in this layer. + /// + /// The difference between 'end_lsn' and 'latest_lsn' is the same as between + /// 'start_lsn' and 'oldest_lsn'. See comments in 'oldest_lsn'. + /// + latest_lsn: Lsn, } impl InMemoryLayerInner { @@ -319,8 +339,13 @@ pub struct LayersOnDisk { impl InMemoryLayer { /// Return the oldest page version that's stored in this layer - pub fn get_oldest_pending_lsn(&self) -> Lsn { - self.oldest_pending_lsn + pub fn get_oldest_lsn(&self) -> Lsn { + self.oldest_lsn + } + + pub fn get_latest_lsn(&self) -> Lsn { + let inner = self.inner.read().unwrap(); + inner.latest_lsn } /// @@ -332,7 +357,7 @@ impl InMemoryLayer { tenantid: ZTenantId, seg: SegmentTag, start_lsn: Lsn, - oldest_pending_lsn: Lsn, + oldest_lsn: Lsn, ) -> Result { trace!( "initializing new empty InMemoryLayer for writing {} on timeline {} at {}", @@ -355,13 +380,14 @@ impl InMemoryLayer { tenantid, seg, start_lsn, - oldest_pending_lsn, + oldest_lsn, incremental: false, inner: RwLock::new(InMemoryLayerInner { end_lsn: None, dropped: false, page_versions: PageVersions::new(file), seg_sizes, + latest_lsn: oldest_lsn, }), }) } @@ -398,6 +424,8 @@ impl InMemoryLayer { let mut inner = self.inner.write().unwrap(); inner.assert_writeable(); + assert!(lsn >= inner.latest_lsn); + inner.latest_lsn = lsn; let old = inner.page_versions.append_or_update_last(blknum, lsn, pv)?; @@ -509,12 +537,11 @@ impl InMemoryLayer { timelineid: ZTimelineId, tenantid: ZTenantId, start_lsn: Lsn, - oldest_pending_lsn: Lsn, + oldest_lsn: Lsn, ) -> Result { let seg = src.get_seg_tag(); - assert!(oldest_pending_lsn.is_aligned()); - assert!(oldest_pending_lsn >= start_lsn); + assert!(oldest_lsn.is_aligned()); trace!( "initializing new InMemoryLayer for writing {} on timeline {} at {}", @@ -538,13 +565,14 @@ impl InMemoryLayer { tenantid, seg, start_lsn, - oldest_pending_lsn, + oldest_lsn, incremental: true, inner: RwLock::new(InMemoryLayerInner { end_lsn: None, dropped: false, page_versions: PageVersions::new(file), seg_sizes, + latest_lsn: oldest_lsn, }), }) } diff --git a/pageserver/src/layered_repository/layer_map.rs b/pageserver/src/layered_repository/layer_map.rs index a91fde5236..fe82fd491c 100644 --- a/pageserver/src/layered_repository/layer_map.rs +++ b/pageserver/src/layered_repository/layer_map.rs @@ -40,7 +40,7 @@ pub struct LayerMap { /// All the layers keyed by segment tag segs: HashMap, - /// All in-memory layers, ordered by 'oldest_pending_lsn' and generation + /// All in-memory layers, ordered by 'oldest_lsn' and generation /// of each layer. This allows easy access to the in-memory layer that /// contains the oldest WAL record. open_layers: BinaryHeap, @@ -83,16 +83,16 @@ impl LayerMap { let layer_id = segentry.update_open(Arc::clone(&layer)); - let oldest_pending_lsn = layer.get_oldest_pending_lsn(); + let oldest_lsn = layer.get_oldest_lsn(); - // After a crash and restart, 'oldest_pending_lsn' of the oldest in-memory + // After a crash and restart, 'oldest_lsn' of the oldest in-memory // layer becomes the WAL streaming starting point, so it better not point // in the middle of a WAL record. - assert!(oldest_pending_lsn.is_aligned()); + assert!(oldest_lsn.is_aligned()); // Also add it to the binary heap let open_layer_entry = OpenLayerEntry { - oldest_pending_lsn: layer.get_oldest_pending_lsn(), + oldest_lsn: layer.get_oldest_lsn(), layer_id, generation: self.current_generation, }; @@ -352,23 +352,23 @@ impl SegEntry { } /// Entry held in LayerMap::open_layers, with boilerplate comparison routines -/// to implement a min-heap ordered by 'oldest_pending_lsn' and 'generation' +/// to implement a min-heap ordered by 'oldest_lsn' and 'generation' /// /// The generation number associated with each entry can be used to distinguish /// recently-added entries (i.e after last call to increment_generation()) from older -/// entries with the same 'oldest_pending_lsn'. +/// entries with the same 'oldest_lsn'. struct OpenLayerEntry { - oldest_pending_lsn: Lsn, // copy of layer.get_oldest_pending_lsn() + oldest_lsn: Lsn, // copy of layer.get_oldest_lsn() generation: u64, layer_id: LayerId, } impl Ord for OpenLayerEntry { fn cmp(&self, other: &Self) -> Ordering { // BinaryHeap is a max-heap, and we want a min-heap. Reverse the ordering here - // to get that. Entries with identical oldest_pending_lsn are ordered by generation + // to get that. Entries with identical oldest_lsn are ordered by generation other - .oldest_pending_lsn - .cmp(&self.oldest_pending_lsn) + .oldest_lsn + .cmp(&self.oldest_lsn) .then_with(|| other.generation.cmp(&self.generation)) } } @@ -437,7 +437,7 @@ mod tests { conf: &'static PageServerConf, segno: u32, start_lsn: Lsn, - oldest_pending_lsn: Lsn, + oldest_lsn: Lsn, ) -> Arc { Arc::new( InMemoryLayer::create( @@ -449,7 +449,7 @@ mod tests { segno, }, start_lsn, - oldest_pending_lsn, + oldest_lsn, ) .unwrap(), )