diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 8aa96dd672..e663060d17 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -602,28 +602,36 @@ impl Timeline { let n_blocks = self .get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx) .await?; - let mut segment = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize); - for blkno in 0..n_blocks { - let block = self - .get_slru_page_at_lsn(kind, segno, blkno, lsn, ctx) - .await?; - segment.extend_from_slice(&block[..BLCKSZ as usize]); - } - Ok(segment.freeze()) - } - /// Look up given SLRU page version. - pub(crate) async fn get_slru_page_at_lsn( - &self, - kind: SlruKind, - segno: u32, - blknum: BlockNumber, - lsn: Lsn, - ctx: &RequestContext, - ) -> Result { - assert!(self.tenant_shard_id.is_shard_zero()); - let key = slru_block_to_key(kind, segno, blknum); - self.get(key, lsn, ctx).await + let keyspace = KeySpace::single( + slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, n_blocks), + ); + + let batches = keyspace.partition( + self.get_shard_identity(), + Timeline::MAX_GET_VECTORED_KEYS * BLCKSZ as u64, + ); + + let io_concurrency = IoConcurrency::spawn_from_conf( + self.conf, + self.gate + .enter() + .map_err(|_| PageReconstructError::Cancelled)?, + ); + + let mut segment = BytesMut::with_capacity(n_blocks as usize * BLCKSZ as usize); + for batch in batches.parts { + let blocks = self + .get_vectored(batch, lsn, io_concurrency.clone(), ctx) + .await?; + + for (_key, block) in blocks { + let block = block?; + segment.extend_from_slice(&block[..BLCKSZ as usize]); + } + } + + Ok(segment.freeze()) } /// Get size of an SLRU segment @@ -832,19 +840,41 @@ impl Timeline { let nblocks = self .get_slru_segment_size(SlruKind::Clog, segno, Version::Lsn(probe_lsn), ctx) .await?; - for blknum in (0..nblocks).rev() { - let clog_page = self - .get_slru_page_at_lsn(SlruKind::Clog, segno, blknum, probe_lsn, ctx) + + let keyspace = KeySpace::single( + slru_block_to_key(SlruKind::Clog, segno, 0) + ..slru_block_to_key(SlruKind::Clog, segno, nblocks), + ); + + let batches = keyspace.partition( + self.get_shard_identity(), + Timeline::MAX_GET_VECTORED_KEYS * BLCKSZ as u64, + ); + + let io_concurrency = IoConcurrency::spawn_from_conf( + self.conf, + self.gate + .enter() + .map_err(|_| PageReconstructError::Cancelled)?, + ); + + for batch in batches.parts.into_iter().rev() { + let blocks = self + .get_vectored(batch, probe_lsn, io_concurrency.clone(), ctx) .await?; - if clog_page.len() == BLCKSZ as usize + 8 { - let mut timestamp_bytes = [0u8; 8]; - timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]); - let timestamp = TimestampTz::from_be_bytes(timestamp_bytes); + for (_key, clog_page) in blocks.into_iter().rev() { + let clog_page = clog_page?; - match f(timestamp) { - ControlFlow::Break(b) => return Ok(b), - ControlFlow::Continue(()) => (), + if clog_page.len() == BLCKSZ as usize + 8 { + let mut timestamp_bytes = [0u8; 8]; + timestamp_bytes.copy_from_slice(&clog_page[BLCKSZ as usize..]); + let timestamp = TimestampTz::from_be_bytes(timestamp_bytes); + + match f(timestamp) { + ControlFlow::Break(b) => return Ok(b), + ControlFlow::Continue(()) => (), + } } } }