diff --git a/libs/pageserver_api/src/keyspace.rs b/libs/pageserver_api/src/keyspace.rs index cab7b3d860..2316acb616 100644 --- a/libs/pageserver_api/src/keyspace.rs +++ b/libs/pageserver_api/src/keyspace.rs @@ -104,6 +104,7 @@ pub struct KeySpaceAccum { accum: Option>, ranges: Vec>, + size: u64, } impl KeySpaceAccum { @@ -111,6 +112,7 @@ impl KeySpaceAccum { Self { accum: None, ranges: Vec::new(), + size: 0, } } @@ -121,6 +123,8 @@ impl KeySpaceAccum { #[inline(always)] pub fn add_range(&mut self, range: Range) { + self.size += key_range_size(&range) as u64; + match self.accum.as_mut() { Some(accum) => { if range.start == accum.end { @@ -146,6 +150,23 @@ impl KeySpaceAccum { ranges: self.ranges, } } + + pub fn consume_keyspace(&mut self) -> KeySpace { + if let Some(accum) = self.accum.take() { + self.ranges.push(accum); + } + + let mut prev_accum = KeySpaceAccum::new(); + std::mem::swap(self, &mut prev_accum); + + KeySpace { + ranges: prev_accum.ranges, + } + } + + pub fn size(&self) -> u64 { + self.size + } } /// @@ -254,6 +275,30 @@ mod tests { } } + #[test] + fn keyspace_consume() { + let ranges = vec![kr(0..10), kr(20..35), kr(40..45)]; + + let mut accum = KeySpaceAccum::new(); + for range in &ranges { + accum.add_range(range.clone()); + } + + let expected_size: u64 = ranges.iter().map(|r| key_range_size(r) as u64).sum(); + assert_eq!(accum.size(), expected_size); + + assert_ks_eq(&accum.consume_keyspace(), ranges.clone()); + assert_eq!(accum.size(), 0); + + assert_ks_eq(&accum.consume_keyspace(), vec![]); + assert_eq!(accum.size(), 0); + + for range in &ranges { + accum.add_range(range.clone()); + } + assert_ks_eq(&accum.to_keyspace(), ranges); + } + #[test] fn keyspace_add_range() { // two separate ranges diff --git a/libs/pageserver_api/src/reltag.rs b/libs/pageserver_api/src/reltag.rs index e3a7da2ad9..3f37af600d 100644 --- a/libs/pageserver_api/src/reltag.rs +++ b/libs/pageserver_api/src/reltag.rs @@ -111,7 +111,19 @@ impl RelTag { /// These files are divided into segments, which are divided into /// pages of the same BLCKSZ as used for relation files. /// -#[derive(Debug, Clone, Copy, Hash, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] +#[derive( + Debug, + Clone, + Copy, + Hash, + Serialize, + Deserialize, + PartialEq, + Eq, + PartialOrd, + Ord, + strum_macros::EnumIter, +)] pub enum SlruKind { Clog, MultiXactMembers, diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index 7e5ae892ad..781f4119c3 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -11,8 +11,9 @@ //! from data stored in object storage. //! use anyhow::{anyhow, bail, ensure, Context}; -use bytes::{BufMut, BytesMut}; +use bytes::{BufMut, Bytes, BytesMut}; use fail::fail_point; +use pageserver_api::key::Key; use postgres_ffi::pg_constants; use std::fmt::Write as FmtWrite; use std::time::SystemTime; @@ -23,7 +24,7 @@ use tracing::*; use tokio_tar::{Builder, EntryType, Header}; use crate::context::RequestContext; -use crate::pgdatadir_mapping::Version; +use crate::pgdatadir_mapping::{key_to_slru_block, Version}; use crate::tenant::Timeline; use pageserver_api::reltag::{RelTag, SlruKind}; @@ -133,6 +134,87 @@ where ctx: &'a RequestContext, } +/// A sink that accepts SLRU blocks ordered by key and forwards +/// full segments to the archive. +struct SlruSegmentsBuilder<'a, 'b, W> +where + W: AsyncWrite + Send + Sync + Unpin, +{ + ar: &'a mut Builder<&'b mut W>, + buf: Vec, + current_segment: Option<(SlruKind, u32)>, +} + +impl<'a, 'b, W> SlruSegmentsBuilder<'a, 'b, W> +where + W: AsyncWrite + Send + Sync + Unpin, +{ + fn new(ar: &'a mut Builder<&'b mut W>) -> Self { + Self { + ar, + buf: Vec::new(), + current_segment: None, + } + } + + async fn add_block(&mut self, key: &Key, block: Bytes) -> anyhow::Result<()> { + let (kind, segno, _) = key_to_slru_block(*key)?; + + match kind { + SlruKind::Clog => { + ensure!(block.len() == BLCKSZ as usize || block.len() == BLCKSZ as usize + 8); + } + SlruKind::MultiXactMembers | SlruKind::MultiXactOffsets => { + ensure!(block.len() == BLCKSZ as usize); + } + } + + let segment = (kind, segno); + match self.current_segment { + None => { + self.current_segment = Some(segment); + self.buf + .extend_from_slice(block.slice(..BLCKSZ as usize).as_ref()); + } + Some(current_seg) if current_seg == segment => { + self.buf + .extend_from_slice(block.slice(..BLCKSZ as usize).as_ref()); + } + Some(_) => { + self.flush().await?; + + self.current_segment = Some(segment); + self.buf + .extend_from_slice(block.slice(..BLCKSZ as usize).as_ref()); + } + } + + Ok(()) + } + + async fn flush(&mut self) -> anyhow::Result<()> { + let nblocks = self.buf.len() / BLCKSZ as usize; + let (kind, segno) = self.current_segment.take().unwrap(); + let segname = format!("{}/{:>04X}", kind.to_str(), segno); + let header = new_tar_header(&segname, self.buf.len() as u64)?; + self.ar.append(&header, self.buf.as_slice()).await?; + + trace!("Added to basebackup slru {} relsize {}", segname, nblocks); + + self.buf.clear(); + + Ok(()) + } + + async fn finish(mut self) -> anyhow::Result<()> { + if self.current_segment.is_none() || self.buf.is_empty() { + return Ok(()); + } + + self.flush().await + } +} + impl<'a, W> Basebackup<'a, W> where W: AsyncWrite + Send + Sync + Unpin, @@ -168,20 +250,27 @@ where } // Gather non-relational files from object storage pages. - for kind in [ - SlruKind::Clog, - SlruKind::MultiXactOffsets, - SlruKind::MultiXactMembers, - ] { - for segno in self + let slru_partitions = self + .timeline + .get_slru_keyspace(Version::Lsn(self.lsn), self.ctx) + .await? + .partition(Timeline::MAX_GET_VECTORED_KEYS * BLCKSZ as u64); + + let mut slru_builder = SlruSegmentsBuilder::new(&mut self.ar); + + for part in slru_partitions.parts { + let blocks = self .timeline - .list_slru_segments(kind, Version::Lsn(self.lsn), self.ctx) - .await? - { - self.add_slru_segment(kind, segno).await?; + .get_vectored(&part.ranges, self.lsn, self.ctx) + .await?; + + for (key, block) in blocks { + slru_builder.add_block(&key, block?).await?; } } + slru_builder.finish().await?; + let mut min_restart_lsn: Lsn = Lsn::MAX; // Create tablespace directories for ((spcnode, dbnode), has_relmap_file) in @@ -305,39 +394,6 @@ where Ok(()) } - // - // Generate SLRU segment files from repository. - // - async fn add_slru_segment(&mut self, slru: SlruKind, segno: u32) -> anyhow::Result<()> { - let nblocks = self - .timeline - .get_slru_segment_size(slru, segno, Version::Lsn(self.lsn), self.ctx) - .await?; - - let mut slru_buf: Vec = Vec::with_capacity(nblocks as usize * BLCKSZ as usize); - for blknum in 0..nblocks { - let img = self - .timeline - .get_slru_page_at_lsn(slru, segno, blknum, self.lsn, self.ctx) - .await?; - - if slru == SlruKind::Clog { - ensure!(img.len() == BLCKSZ as usize || img.len() == BLCKSZ as usize + 8); - } else { - ensure!(img.len() == BLCKSZ as usize); - } - - slru_buf.extend_from_slice(&img[..BLCKSZ as usize]); - } - - let segname = format!("{}/{:>04X}", slru.to_str(), segno); - let header = new_tar_header(&segname, slru_buf.len() as u64)?; - self.ar.append(&header, slru_buf.as_slice()).await?; - - trace!("Added to basebackup slru {} relsize {}", segname, nblocks); - Ok(()) - } - // // Include database/tablespace directories. // diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index ae182b8dc6..b65fe1eddd 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -27,6 +27,7 @@ use serde::{Deserialize, Serialize}; use std::collections::{hash_map, HashMap, HashSet}; use std::ops::ControlFlow; use std::ops::Range; +use strum::IntoEnumIterator; use tokio_util::sync::CancellationToken; use tracing::{debug, trace, warn}; use utils::bin_ser::DeserializeError; @@ -533,6 +534,33 @@ impl Timeline { Ok(Default::default()) } + pub(crate) async fn get_slru_keyspace( + &self, + version: Version<'_>, + ctx: &RequestContext, + ) -> Result { + let mut accum = KeySpaceAccum::new(); + + for kind in SlruKind::iter() { + let mut segments: Vec = self + .list_slru_segments(kind, version, ctx) + .await? + .into_iter() + .collect(); + segments.sort_unstable(); + + for seg in segments { + let block_count = self.get_slru_segment_size(kind, seg, version, ctx).await?; + + accum.add_range( + slru_block_to_key(kind, seg, 0)..slru_block_to_key(kind, seg, block_count), + ); + } + } + + Ok(accum.to_keyspace()) + } + /// Get a list of SLRU segments pub(crate) async fn list_slru_segments( &self, diff --git a/pageserver/src/tenant/layer_map.rs b/pageserver/src/tenant/layer_map.rs index 9b6225501f..c31d401e84 100644 --- a/pageserver/src/tenant/layer_map.rs +++ b/pageserver/src/tenant/layer_map.rs @@ -283,15 +283,15 @@ impl LayerMap { /// /// This is used for garbage collection, to determine if an old layer can /// be deleted. - pub fn image_layer_exists(&self, key: &Range, lsn: &Range) -> Result { + pub fn image_layer_exists(&self, key: &Range, lsn: &Range) -> bool { if key.is_empty() { // Vacuously true. There's a newer image for all 0 of the kerys in the range. - return Ok(true); + return true; } let version = match self.historic.get().unwrap().get_version(lsn.end.0 - 1) { Some(v) => v, - None => return Ok(false), + None => return false, }; let start = key.start.to_i128(); @@ -304,17 +304,17 @@ impl LayerMap { // Check the start is covered if !layer_covers(version.image_coverage.query(start)) { - return Ok(false); + return false; } // Check after all changes of coverage for (_, change_val) in version.image_coverage.range(start..end) { if !layer_covers(change_val) { - return Ok(false); + return false; } } - Ok(true) + true } pub fn iter_historic_layers(&self) -> impl '_ + Iterator> { @@ -325,18 +325,14 @@ impl LayerMap { /// Divide the whole given range of keys into sub-ranges based on the latest /// image layer that covers each range at the specified lsn (inclusive). /// This is used when creating new image layers. - /// - // FIXME: clippy complains that the result type is very complex. She's probably - // right... - #[allow(clippy::type_complexity)] pub fn image_coverage( &self, key_range: &Range, lsn: Lsn, - ) -> Result, Option>)>> { + ) -> Vec<(Range, Option>)> { let version = match self.historic.get().unwrap().get_version(lsn.0) { Some(v) => v, - None => return Ok(vec![]), + None => return vec![], }; let start = key_range.start.to_i128(); @@ -359,7 +355,7 @@ impl LayerMap { let kr = Key::from_i128(current_key)..Key::from_i128(end); coverage.push((kr, current_val.take())); - Ok(coverage) + coverage } pub fn is_l0(layer: &PersistentLayerDesc) -> bool { @@ -410,24 +406,19 @@ impl LayerMap { /// This number is used to compute the largest number of deltas that /// we'll need to visit for any page reconstruction in this region. /// We use this heuristic to decide whether to create an image layer. - pub fn count_deltas( - &self, - key: &Range, - lsn: &Range, - limit: Option, - ) -> Result { + pub fn count_deltas(&self, key: &Range, lsn: &Range, limit: Option) -> usize { // We get the delta coverage of the region, and for each part of the coverage // we recurse right underneath the delta. The recursion depth is limited by // the largest result this function could return, which is in practice between // 3 and 10 (since we usually try to create an image when the number gets larger). if lsn.is_empty() || key.is_empty() || limit == Some(0) { - return Ok(0); + return 0; } let version = match self.historic.get().unwrap().get_version(lsn.end.0 - 1) { Some(v) => v, - None => return Ok(0), + None => return 0, }; let start = key.start.to_i128(); @@ -448,8 +439,7 @@ impl LayerMap { if !kr.is_empty() { let base_count = Self::is_reimage_worthy(&val, key) as usize; let new_limit = limit.map(|l| l - base_count); - let max_stacked_deltas_underneath = - self.count_deltas(&kr, &lr, new_limit)?; + let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit); max_stacked_deltas = std::cmp::max( max_stacked_deltas, base_count + max_stacked_deltas_underneath, @@ -471,7 +461,7 @@ impl LayerMap { if !kr.is_empty() { let base_count = Self::is_reimage_worthy(&val, key) as usize; let new_limit = limit.map(|l| l - base_count); - let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit)?; + let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit); max_stacked_deltas = std::cmp::max( max_stacked_deltas, base_count + max_stacked_deltas_underneath, @@ -480,7 +470,7 @@ impl LayerMap { } } - Ok(max_stacked_deltas) + max_stacked_deltas } /// Count how many reimage-worthy layers we need to visit for given key-lsn pair. @@ -592,10 +582,7 @@ impl LayerMap { if limit == Some(difficulty) { break; } - for (img_range, last_img) in self - .image_coverage(range, lsn) - .expect("why would this err?") - { + for (img_range, last_img) in self.image_coverage(range, lsn) { if limit == Some(difficulty) { break; } @@ -606,9 +593,7 @@ impl LayerMap { }; if img_lsn < lsn { - let num_deltas = self - .count_deltas(&img_range, &(img_lsn..lsn), limit) - .expect("why would this err lol?"); + let num_deltas = self.count_deltas(&img_range, &(img_lsn..lsn), limit); difficulty = std::cmp::max(difficulty, num_deltas); } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 69d0d0b320..bac9bf6573 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -14,6 +14,7 @@ use enumset::EnumSet; use fail::fail_point; use itertools::Itertools; use pageserver_api::{ + keyspace::{key_range_size, KeySpaceAccum}, models::{ DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy, LayerMapInfo, TimelineState, @@ -32,7 +33,7 @@ use tokio_util::sync::CancellationToken; use tracing::*; use utils::sync::gate::Gate; -use std::collections::{BinaryHeap, HashMap, HashSet}; +use std::collections::{BTreeMap, BinaryHeap, HashMap, HashSet}; use std::ops::{Deref, Range}; use std::pin::pin; use std::sync::atomic::Ordering as AtomicOrdering; @@ -404,6 +405,21 @@ pub(crate) enum PageReconstructError { WalRedo(anyhow::Error), } +#[derive(thiserror::Error, Debug)] +enum CreateImageLayersError { + #[error("timeline shutting down")] + Cancelled, + + #[error(transparent)] + GetVectoredError(GetVectoredError), + + #[error(transparent)] + PageReconstructError(PageReconstructError), + + #[error(transparent)] + Other(#[from] anyhow::Error), +} + #[derive(thiserror::Error, Debug)] enum FlushLayerError { /// Timeline cancellation token was cancelled @@ -411,12 +427,24 @@ enum FlushLayerError { Cancelled, #[error(transparent)] - PageReconstructError(#[from] PageReconstructError), + CreateImageLayersError(CreateImageLayersError), #[error(transparent)] Other(#[from] anyhow::Error), } +#[derive(thiserror::Error, Debug)] +pub(crate) enum GetVectoredError { + #[error("timeline shutting down")] + Cancelled, + + #[error("Requested too many keys: {0} > {}", Timeline::MAX_GET_VECTORED_KEYS)] + Oversized(u64), + + #[error("Requested at invalid LSN: {0}")] + InvalidLsn(Lsn), +} + #[derive(Clone, Copy)] pub enum LogicalSizeCalculationCause { Initial, @@ -456,6 +484,45 @@ pub(crate) enum WaitLsnError { Timeout(String), } +// The impls below achieve cancellation mapping for errors. +// Perhaps there's a way of achieving this with less cruft. + +impl From for CompactionError { + fn from(e: CreateImageLayersError) -> Self { + match e { + CreateImageLayersError::Cancelled => CompactionError::ShuttingDown, + _ => CompactionError::Other(e.into()), + } + } +} + +impl From for FlushLayerError { + fn from(e: CreateImageLayersError) -> Self { + match e { + CreateImageLayersError::Cancelled => FlushLayerError::Cancelled, + any => FlushLayerError::CreateImageLayersError(any), + } + } +} + +impl From for CreateImageLayersError { + fn from(e: PageReconstructError) -> Self { + match e { + PageReconstructError::Cancelled => CreateImageLayersError::Cancelled, + _ => CreateImageLayersError::PageReconstructError(e), + } + } +} + +impl From for CreateImageLayersError { + fn from(e: GetVectoredError) -> Self { + match e { + GetVectoredError::Cancelled => CreateImageLayersError::Cancelled, + _ => CreateImageLayersError::GetVectoredError(e), + } + } +} + /// Public interface functions impl Timeline { /// Get the LSN where this branch was created @@ -575,6 +642,53 @@ impl Timeline { res } + pub(crate) const MAX_GET_VECTORED_KEYS: u64 = 32; + + /// Look up multiple page versions at a given LSN + /// + /// This naive implementation will be replaced with a more efficient one + /// which actually vectorizes the read path. + pub(crate) async fn get_vectored( + &self, + key_ranges: &[Range], + lsn: Lsn, + ctx: &RequestContext, + ) -> Result>, GetVectoredError> { + if !lsn.is_valid() { + return Err(GetVectoredError::InvalidLsn(lsn)); + } + + let key_count = key_ranges + .iter() + .map(|range| key_range_size(range) as u64) + .sum(); + if key_count > Timeline::MAX_GET_VECTORED_KEYS { + return Err(GetVectoredError::Oversized(key_count)); + } + + let mut values = BTreeMap::new(); + for range in key_ranges { + let mut key = range.start; + while key != range.end { + assert!(!self.shard_identity.is_key_disposable(&key)); + + let block = self.get(key, lsn, ctx).await; + + if matches!( + block, + Err(PageReconstructError::Cancelled | PageReconstructError::AncestorStopping(_)) + ) { + return Err(GetVectoredError::Cancelled); + } + + values.insert(key, block); + key = key.next(); + } + } + + Ok(values) + } + /// Get last or prev record separately. Same as get_last_record_rlsn().last/prev. pub fn get_last_record_lsn(&self) -> Lsn { self.last_record_lsn.load().last @@ -2582,7 +2696,7 @@ impl Timeline { return; } err @ Err( - FlushLayerError::Other(_) | FlushLayerError::PageReconstructError(_), + FlushLayerError::Other(_) | FlushLayerError::CreateImageLayersError(_), ) => { error!("could not flush frozen layer: {err:?}"); break err; @@ -2950,11 +3064,7 @@ impl Timeline { } // Is it time to create a new image layer for the given partition? - async fn time_for_new_image_layer( - &self, - partition: &KeySpace, - lsn: Lsn, - ) -> anyhow::Result { + async fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> bool { let threshold = self.get_image_creation_threshold(); let guard = self.layers.read().await; @@ -2974,20 +3084,20 @@ impl Timeline { // but the range is already covered by image layers at more recent LSNs. Before we // create a new image layer, check if the range is already covered at more recent LSNs. if !layers - .image_layer_exists(&img_range, &(Lsn::min(lsn, *cutoff_lsn)..lsn + 1))? + .image_layer_exists(&img_range, &(Lsn::min(lsn, *cutoff_lsn)..lsn + 1)) { debug!( "Force generation of layer {}-{} wanted by GC, cutoff={}, lsn={})", img_range.start, img_range.end, cutoff_lsn, lsn ); - return Ok(true); + return true; } } } } for part_range in &partition.ranges { - let image_coverage = layers.image_coverage(part_range, lsn)?; + let image_coverage = layers.image_coverage(part_range, lsn); for (img_range, last_img) in image_coverage { let img_lsn = if let Some(last_img) = last_img { last_img.get_lsn_range().end @@ -3008,7 +3118,7 @@ impl Timeline { // after we read last_record_lsn, which is passed here in the 'lsn' argument. if img_lsn < lsn { let num_deltas = - layers.count_deltas(&img_range, &(img_lsn..lsn), Some(threshold))?; + layers.count_deltas(&img_range, &(img_lsn..lsn), Some(threshold)); max_deltas = max_deltas.max(num_deltas); if num_deltas >= threshold { @@ -3016,7 +3126,7 @@ impl Timeline { "key range {}-{}, has {} deltas on this timeline in LSN range {}..{}", img_range.start, img_range.end, num_deltas, img_lsn, lsn ); - return Ok(true); + return true; } } } @@ -3026,7 +3136,7 @@ impl Timeline { max_deltas, "none of the partitioned ranges had >= {threshold} deltas" ); - Ok(false) + false } #[tracing::instrument(skip_all, fields(%lsn, %force))] @@ -3036,7 +3146,7 @@ impl Timeline { lsn: Lsn, force: bool, ctx: &RequestContext, - ) -> Result, PageReconstructError> { + ) -> Result, CreateImageLayersError> { let timer = self.metrics.create_images_time_histo.start_timer(); let mut image_layers = Vec::new(); @@ -3054,7 +3164,7 @@ impl Timeline { for partition in partitioning.parts.iter() { let img_range = start..partition.ranges.last().unwrap().end; start = img_range.end; - if force || self.time_for_new_image_layer(partition, lsn).await? { + if force || self.time_for_new_image_layer(partition, lsn).await { let mut image_layer_writer = ImageLayerWriter::new( self.conf, self.timeline_id, @@ -3065,10 +3175,12 @@ impl Timeline { .await?; fail_point!("image-layer-writer-fail-before-finish", |_| { - Err(PageReconstructError::Other(anyhow::anyhow!( + Err(CreateImageLayersError::Other(anyhow::anyhow!( "failpoint image-layer-writer-fail-before-finish" ))) }); + + let mut key_request_accum = KeySpaceAccum::new(); for range in &partition.ranges { let mut key = range.start; while key < range.end { @@ -3081,34 +3193,55 @@ impl Timeline { key = key.next(); continue; } - let img = match self.get(key, lsn, ctx).await { - Ok(img) => img, - Err(err) => { - // If we fail to reconstruct a VM or FSM page, we can zero the - // page without losing any actual user data. That seems better - // than failing repeatedly and getting stuck. - // - // We had a bug at one point, where we truncated the FSM and VM - // in the pageserver, but the Postgres didn't know about that - // and continued to generate incremental WAL records for pages - // that didn't exist in the pageserver. Trying to replay those - // WAL records failed to find the previous image of the page. - // This special case allows us to recover from that situation. - // See https://github.com/neondatabase/neon/issues/2601. - // - // Unfortunately we cannot do this for the main fork, or for - // any metadata keys, keys, as that would lead to actual data - // loss. - if is_rel_fsm_block_key(key) || is_rel_vm_block_key(key) { - warn!("could not reconstruct FSM or VM key {key}, filling with zeros: {err:?}"); - ZERO_PAGE.clone() - } else { - return Err(err); - } - } - }; - image_layer_writer.put_image(key, &img).await?; + key_request_accum.add_key(key); + if key_request_accum.size() >= Timeline::MAX_GET_VECTORED_KEYS + || key.next() == range.end + { + let results = self + .get_vectored( + &key_request_accum.consume_keyspace().ranges, + lsn, + ctx, + ) + .await?; + + for (img_key, img) in results { + let img = match img { + Ok(img) => img, + Err(err) => { + // If we fail to reconstruct a VM or FSM page, we can zero the + // page without losing any actual user data. That seems better + // than failing repeatedly and getting stuck. + // + // We had a bug at one point, where we truncated the FSM and VM + // in the pageserver, but the Postgres didn't know about that + // and continued to generate incremental WAL records for pages + // that didn't exist in the pageserver. Trying to replay those + // WAL records failed to find the previous image of the page. + // This special case allows us to recover from that situation. + // See https://github.com/neondatabase/neon/issues/2601. + // + // Unfortunately we cannot do this for the main fork, or for + // any metadata keys, keys, as that would lead to actual data + // loss. + if is_rel_fsm_block_key(img_key) + || is_rel_vm_block_key(img_key) + { + warn!("could not reconstruct FSM or VM key {img_key}, filling with zeros: {err:?}"); + ZERO_PAGE.clone() + } else { + return Err( + CreateImageLayersError::PageReconstructError(err), + ); + } + } + }; + + image_layer_writer.put_image(img_key, &img).await?; + } + } + key = key.next(); } } @@ -3484,7 +3617,7 @@ impl Timeline { // has not so much sense, because largest holes will corresponds field1/field2 changes. // But we are mostly interested to eliminate holes which cause generation of excessive image layers. // That is why it is better to measure size of hole as number of covering image layers. - let coverage_size = layers.image_coverage(&key_range, last_record_lsn)?.len(); + let coverage_size = layers.image_coverage(&key_range, last_record_lsn).len(); if coverage_size >= min_hole_coverage_size { heap.push(Hole { key_range, @@ -4110,7 +4243,7 @@ impl Timeline { // we cannot remove C, even though it's older than 2500, because // the delta layer 2000-3000 depends on it. if !layers - .image_layer_exists(&l.get_key_range(), &(l.get_lsn_range().end..new_gc_cutoff))? + .image_layer_exists(&l.get_key_range(), &(l.get_lsn_range().end..new_gc_cutoff)) { debug!("keeping {} because it is the latest layer", l.filename()); // Collect delta key ranges that need image layers to allow garbage