diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 74c9bddfcd..6648487c06 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -46,6 +46,8 @@ use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION}; use anyhow::{anyhow, bail, ensure, Context, Result}; use bytes::BytesMut; use camino::{Utf8Path, Utf8PathBuf}; +use futures::pin_mut; +use futures::StreamExt; use pageserver_api::keyspace::KeySpace; use pageserver_api::models::LayerAccessKind; use pageserver_api::shard::TenantShardId; @@ -879,54 +881,51 @@ impl DeltaLayerInner { block_reader, ); + let btree_request_context = RequestContextBuilder::extend(ctx) + .page_content_kind(PageContentKind::DeltaLayerBtreeNode) + .build(); + for range in keyspace.ranges.iter() { let mut range_end_handled = false; let start_key = DeltaKey::from_key_lsn(&range.start, lsn_range.start); - tree_reader - .visit( - &start_key.0, - VisitDirection::Forwards, - |raw_key, value| { - let key = Key::from_slice(&raw_key[..KEY_SIZE]); - let lsn = DeltaKey::extract_lsn_from_buf(raw_key); - let blob_ref = BlobRef(value); + let index_stream = tree_reader.get_stream_from(&start_key.0, &btree_request_context); + pin_mut!(index_stream); - // Lsns are not monotonically increasing, so we don't assert on them. - assert!(key >= range.start); + while let Some(index_entry) = index_stream.next().await { + let (raw_key, value) = index_entry.map_err(|err| anyhow!(err))?; + let key = Key::from_slice(&raw_key[..KEY_SIZE]); + let lsn = DeltaKey::extract_lsn_from_buf(&raw_key); + let blob_ref = BlobRef(value); - let flag = { - #[allow(clippy::if_same_then_else)] - if lsn >= lsn_range.end || lsn < lsn_range.start { - // If the Lsn is not in the queried range it must be ignored - BlobFlag::Ignore - } else if reconstruct_state.get_cached_lsn(&key) >= Some(lsn) { - // If the Lsn is below the caching line it must be ignored - BlobFlag::Ignore - } else if blob_ref.will_init() { - // This blob will replace all previous blobs for this key - BlobFlag::Replaces - } else { - // Usual path: add blob to the read - BlobFlag::None - } - }; + // Lsns are not monotonically increasing, so we don't assert on them. + assert!(key >= range.start); - if key >= range.end || (key.next() == range.end && lsn >= lsn_range.end) { - planner.handle_range_end(blob_ref.pos()); - range_end_handled = true; - false - } else { - planner.handle(key, lsn, blob_ref.pos(), flag); - true - } - }, - &RequestContextBuilder::extend(ctx) - .page_content_kind(PageContentKind::DeltaLayerBtreeNode) - .build(), - ) - .await - .map_err(|err| anyhow!(err))?; + let flag = { + #[allow(clippy::if_same_then_else)] + if lsn >= lsn_range.end || lsn < lsn_range.start { + // If the Lsn is not in the queried range it must be ignored + BlobFlag::Ignore + } else if reconstruct_state.get_cached_lsn(&key) >= Some(lsn) { + // If the Lsn is below the caching line it must be ignored + BlobFlag::Ignore + } else if blob_ref.will_init() { + // This blob will replace all previous blobs for this key + BlobFlag::Replaces + } else { + // Usual path: add blob to the read + BlobFlag::None + } + }; + + if key >= range.end || (key.next() == range.end && lsn >= lsn_range.end) { + planner.handle_range_end(blob_ref.pos()); + range_end_handled = true; + break; + } else { + planner.handle(key, lsn, blob_ref.pos(), flag); + } + } if !range_end_handled { let payload_end = self.index_start_blk as u64 * PAGE_SZ as u64; diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 56cfaeda15..99a3ef727a 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -43,6 +43,8 @@ use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX}; use anyhow::{anyhow, bail, ensure, Context, Result}; use bytes::{Bytes, BytesMut}; use camino::{Utf8Path, Utf8PathBuf}; +use futures::pin_mut; +use hex; use pageserver_api::keyspace::KeySpace; use pageserver_api::models::LayerAccessKind; use pageserver_api::shard::TenantShardId; @@ -54,6 +56,7 @@ use std::ops::Range; use std::os::unix::prelude::FileExt; use std::sync::Arc; use tokio::sync::OnceCell; +use tokio_stream::StreamExt; use tracing::*; use utils::{ @@ -488,35 +491,33 @@ impl ImageLayerInner { let tree_reader = DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader); + let btree_request_context = RequestContextBuilder::extend(ctx) + .page_content_kind(PageContentKind::DeltaLayerBtreeNode) + .build(); + for range in keyspace.ranges.iter() { let mut range_end_handled = false; let mut search_key: [u8; KEY_SIZE] = [0u8; KEY_SIZE]; range.start.write_to_byte_slice(&mut search_key); - tree_reader - .visit( - &search_key, - VisitDirection::Forwards, - |raw_key, offset| { - let key = Key::from_slice(&raw_key[..KEY_SIZE]); - assert!(key >= range.start); + let index_stream = tree_reader.get_stream_from(&search_key, &btree_request_context); + pin_mut!(index_stream); - if key >= range.end { - planner.handle_range_end(offset); - range_end_handled = true; - false - } else { - planner.handle(key, self.lsn, offset, BlobFlag::None); - true - } - }, - &RequestContextBuilder::extend(ctx) - .page_content_kind(PageContentKind::ImageLayerBtreeNode) - .build(), - ) - .await - .map_err(|err| GetVectoredError::Other(anyhow!(err)))?; + while let Some(index_entry) = index_stream.next().await { + let (raw_key, offset) = index_entry.map_err(|err| anyhow!(err))?; + + let key = Key::from_slice(&raw_key[..KEY_SIZE]); + assert!(key >= range.start); + + if key >= range.end { + planner.handle_range_end(offset); + range_end_handled = true; + break; + } else { + planner.handle(key, self.lsn, offset, BlobFlag::None); + } + } if !range_end_handled { let payload_end = self.index_start_blk as u64 * PAGE_SZ as u64;