mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-15 12:10:37 +00:00
pageserver: refactor delta and img index search to use stream
This commit is contained in:
@@ -46,6 +46,8 @@ use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use bytes::BytesMut;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use futures::pin_mut;
|
||||
use futures::StreamExt;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::LayerAccessKind;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
@@ -879,54 +881,51 @@ impl DeltaLayerInner {
|
||||
block_reader,
|
||||
);
|
||||
|
||||
let btree_request_context = RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::DeltaLayerBtreeNode)
|
||||
.build();
|
||||
|
||||
for range in keyspace.ranges.iter() {
|
||||
let mut range_end_handled = false;
|
||||
|
||||
let start_key = DeltaKey::from_key_lsn(&range.start, lsn_range.start);
|
||||
tree_reader
|
||||
.visit(
|
||||
&start_key.0,
|
||||
VisitDirection::Forwards,
|
||||
|raw_key, value| {
|
||||
let key = Key::from_slice(&raw_key[..KEY_SIZE]);
|
||||
let lsn = DeltaKey::extract_lsn_from_buf(raw_key);
|
||||
let blob_ref = BlobRef(value);
|
||||
let index_stream = tree_reader.get_stream_from(&start_key.0, &btree_request_context);
|
||||
pin_mut!(index_stream);
|
||||
|
||||
// Lsns are not monotonically increasing, so we don't assert on them.
|
||||
assert!(key >= range.start);
|
||||
while let Some(index_entry) = index_stream.next().await {
|
||||
let (raw_key, value) = index_entry.map_err(|err| anyhow!(err))?;
|
||||
let key = Key::from_slice(&raw_key[..KEY_SIZE]);
|
||||
let lsn = DeltaKey::extract_lsn_from_buf(&raw_key);
|
||||
let blob_ref = BlobRef(value);
|
||||
|
||||
let flag = {
|
||||
#[allow(clippy::if_same_then_else)]
|
||||
if lsn >= lsn_range.end || lsn < lsn_range.start {
|
||||
// If the Lsn is not in the queried range it must be ignored
|
||||
BlobFlag::Ignore
|
||||
} else if reconstruct_state.get_cached_lsn(&key) >= Some(lsn) {
|
||||
// If the Lsn is below the caching line it must be ignored
|
||||
BlobFlag::Ignore
|
||||
} else if blob_ref.will_init() {
|
||||
// This blob will replace all previous blobs for this key
|
||||
BlobFlag::Replaces
|
||||
} else {
|
||||
// Usual path: add blob to the read
|
||||
BlobFlag::None
|
||||
}
|
||||
};
|
||||
// Lsns are not monotonically increasing, so we don't assert on them.
|
||||
assert!(key >= range.start);
|
||||
|
||||
if key >= range.end || (key.next() == range.end && lsn >= lsn_range.end) {
|
||||
planner.handle_range_end(blob_ref.pos());
|
||||
range_end_handled = true;
|
||||
false
|
||||
} else {
|
||||
planner.handle(key, lsn, blob_ref.pos(), flag);
|
||||
true
|
||||
}
|
||||
},
|
||||
&RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::DeltaLayerBtreeNode)
|
||||
.build(),
|
||||
)
|
||||
.await
|
||||
.map_err(|err| anyhow!(err))?;
|
||||
let flag = {
|
||||
#[allow(clippy::if_same_then_else)]
|
||||
if lsn >= lsn_range.end || lsn < lsn_range.start {
|
||||
// If the Lsn is not in the queried range it must be ignored
|
||||
BlobFlag::Ignore
|
||||
} else if reconstruct_state.get_cached_lsn(&key) >= Some(lsn) {
|
||||
// If the Lsn is below the caching line it must be ignored
|
||||
BlobFlag::Ignore
|
||||
} else if blob_ref.will_init() {
|
||||
// This blob will replace all previous blobs for this key
|
||||
BlobFlag::Replaces
|
||||
} else {
|
||||
// Usual path: add blob to the read
|
||||
BlobFlag::None
|
||||
}
|
||||
};
|
||||
|
||||
if key >= range.end || (key.next() == range.end && lsn >= lsn_range.end) {
|
||||
planner.handle_range_end(blob_ref.pos());
|
||||
range_end_handled = true;
|
||||
break;
|
||||
} else {
|
||||
planner.handle(key, lsn, blob_ref.pos(), flag);
|
||||
}
|
||||
}
|
||||
|
||||
if !range_end_handled {
|
||||
let payload_end = self.index_start_blk as u64 * PAGE_SZ as u64;
|
||||
|
||||
@@ -43,6 +43,8 @@ use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use futures::pin_mut;
|
||||
use hex;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::LayerAccessKind;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
@@ -54,6 +56,7 @@ use std::ops::Range;
|
||||
use std::os::unix::prelude::FileExt;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::OnceCell;
|
||||
use tokio_stream::StreamExt;
|
||||
use tracing::*;
|
||||
|
||||
use utils::{
|
||||
@@ -488,35 +491,33 @@ impl ImageLayerInner {
|
||||
let tree_reader =
|
||||
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
|
||||
|
||||
let btree_request_context = RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::DeltaLayerBtreeNode)
|
||||
.build();
|
||||
|
||||
for range in keyspace.ranges.iter() {
|
||||
let mut range_end_handled = false;
|
||||
|
||||
let mut search_key: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
|
||||
range.start.write_to_byte_slice(&mut search_key);
|
||||
|
||||
tree_reader
|
||||
.visit(
|
||||
&search_key,
|
||||
VisitDirection::Forwards,
|
||||
|raw_key, offset| {
|
||||
let key = Key::from_slice(&raw_key[..KEY_SIZE]);
|
||||
assert!(key >= range.start);
|
||||
let index_stream = tree_reader.get_stream_from(&search_key, &btree_request_context);
|
||||
pin_mut!(index_stream);
|
||||
|
||||
if key >= range.end {
|
||||
planner.handle_range_end(offset);
|
||||
range_end_handled = true;
|
||||
false
|
||||
} else {
|
||||
planner.handle(key, self.lsn, offset, BlobFlag::None);
|
||||
true
|
||||
}
|
||||
},
|
||||
&RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::ImageLayerBtreeNode)
|
||||
.build(),
|
||||
)
|
||||
.await
|
||||
.map_err(|err| GetVectoredError::Other(anyhow!(err)))?;
|
||||
while let Some(index_entry) = index_stream.next().await {
|
||||
let (raw_key, offset) = index_entry.map_err(|err| anyhow!(err))?;
|
||||
|
||||
let key = Key::from_slice(&raw_key[..KEY_SIZE]);
|
||||
assert!(key >= range.start);
|
||||
|
||||
if key >= range.end {
|
||||
planner.handle_range_end(offset);
|
||||
range_end_handled = true;
|
||||
break;
|
||||
} else {
|
||||
planner.handle(key, self.lsn, offset, BlobFlag::None);
|
||||
}
|
||||
}
|
||||
|
||||
if !range_end_handled {
|
||||
let payload_end = self.index_start_blk as u64 * PAGE_SZ as u64;
|
||||
|
||||
Reference in New Issue
Block a user