mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-15 04:00:38 +00:00
Compare commits
7 Commits
use_debug_
...
vlad/vecto
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
64ad7658e3 | ||
|
|
cffe724b01 | ||
|
|
2d7432231f | ||
|
|
58f00b83c1 | ||
|
|
0870dafc32 | ||
|
|
024f2923a6 | ||
|
|
7debd6162c |
@@ -738,6 +738,8 @@ pub enum PagestreamFeMessage {
|
||||
GetPage(PagestreamGetPageRequest),
|
||||
DbSize(PagestreamDbSizeRequest),
|
||||
GetSlruSegment(PagestreamGetSlruSegmentRequest),
|
||||
GetVectoredPages(PagestreamGetVectoredPagesRequest),
|
||||
GetControlFileValues(PagestreamGetControlFileValuesRequest),
|
||||
}
|
||||
|
||||
// Wrapped in libpq CopyData
|
||||
@@ -749,6 +751,8 @@ pub enum PagestreamBeMessage {
|
||||
Error(PagestreamErrorResponse),
|
||||
DbSize(PagestreamDbSizeResponse),
|
||||
GetSlruSegment(PagestreamGetSlruSegmentResponse),
|
||||
GetVectoredPages(PagestreamGetVectoredPagesResponse),
|
||||
GetControlFileValues(PagestreamGetControlFileValuesResponse),
|
||||
}
|
||||
|
||||
// Keep in sync with `pagestore_client.h`
|
||||
@@ -760,6 +764,8 @@ enum PagestreamBeMessageTag {
|
||||
Error = 103,
|
||||
DbSize = 104,
|
||||
GetSlruSegment = 105,
|
||||
GetVectoredPages = 106,
|
||||
GetControlFileValues = 107,
|
||||
}
|
||||
impl TryFrom<u8> for PagestreamBeMessageTag {
|
||||
type Error = u8;
|
||||
@@ -771,6 +777,7 @@ impl TryFrom<u8> for PagestreamBeMessageTag {
|
||||
103 => Ok(PagestreamBeMessageTag::Error),
|
||||
104 => Ok(PagestreamBeMessageTag::DbSize),
|
||||
105 => Ok(PagestreamBeMessageTag::GetSlruSegment),
|
||||
106 => Ok(PagestreamBeMessageTag::GetVectoredPages),
|
||||
_ => Err(value),
|
||||
}
|
||||
}
|
||||
@@ -813,6 +820,20 @@ pub struct PagestreamGetSlruSegmentRequest {
|
||||
pub segno: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub struct PagestreamGetVectoredPagesRequest {
|
||||
pub latest: bool,
|
||||
pub lsn: Lsn,
|
||||
pub rel: RelTag,
|
||||
pub blkno: u32,
|
||||
pub count: u8,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub struct PagestreamGetControlFileValuesRequest {
|
||||
pub lsn: Lsn,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PagestreamExistsResponse {
|
||||
pub exists: bool,
|
||||
@@ -833,6 +854,15 @@ pub struct PagestreamGetSlruSegmentResponse {
|
||||
pub segment: Bytes,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PagestreamGetVectoredPagesResponse {
|
||||
pub page_count: u8,
|
||||
pub pages: Bytes,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PagestreamGetControlFileValuesResponse {}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PagestreamErrorResponse {
|
||||
pub message: String,
|
||||
@@ -904,6 +934,22 @@ impl PagestreamFeMessage {
|
||||
bytes.put_u8(req.kind);
|
||||
bytes.put_u32(req.segno);
|
||||
}
|
||||
|
||||
Self::GetVectoredPages(req) => {
|
||||
bytes.put_u8(5);
|
||||
bytes.put_u8(u8::from(req.latest));
|
||||
bytes.put_u64(req.lsn.0);
|
||||
bytes.put_u32(req.rel.spcnode);
|
||||
bytes.put_u32(req.rel.dbnode);
|
||||
bytes.put_u32(req.rel.relnode);
|
||||
bytes.put_u8(req.rel.forknum);
|
||||
bytes.put_u32(req.blkno);
|
||||
bytes.put_u8(req.count);
|
||||
}
|
||||
Self::GetControlFileValues(req) => {
|
||||
bytes.put_u8(6);
|
||||
bytes.put_u64(req.lsn.0);
|
||||
}
|
||||
}
|
||||
|
||||
bytes.into()
|
||||
@@ -962,6 +1008,25 @@ impl PagestreamFeMessage {
|
||||
segno: body.read_u32::<BigEndian>()?,
|
||||
},
|
||||
)),
|
||||
5 => Ok(PagestreamFeMessage::GetVectoredPages(
|
||||
PagestreamGetVectoredPagesRequest {
|
||||
latest: body.read_u8()? != 0,
|
||||
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
|
||||
rel: RelTag {
|
||||
spcnode: body.read_u32::<BigEndian>()?,
|
||||
dbnode: body.read_u32::<BigEndian>()?,
|
||||
relnode: body.read_u32::<BigEndian>()?,
|
||||
forknum: body.read_u8()?,
|
||||
},
|
||||
blkno: body.read_u32::<BigEndian>()?,
|
||||
count: body.read_u8()?,
|
||||
},
|
||||
)),
|
||||
6 => Ok(PagestreamFeMessage::GetControlFileValues(
|
||||
PagestreamGetControlFileValuesRequest {
|
||||
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
|
||||
},
|
||||
)),
|
||||
_ => bail!("unknown smgr message tag: {:?}", msg_tag),
|
||||
}
|
||||
}
|
||||
@@ -1003,6 +1068,16 @@ impl PagestreamBeMessage {
|
||||
bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32);
|
||||
bytes.put(&resp.segment[..]);
|
||||
}
|
||||
|
||||
Self::GetVectoredPages(resp) => {
|
||||
bytes.put_u8(Tag::GetVectoredPages as u8);
|
||||
bytes.put_u8(resp.page_count);
|
||||
bytes.put(&resp.pages[..]);
|
||||
}
|
||||
|
||||
Self::GetControlFileValues(_resp) => {
|
||||
bytes.put_u8(Tag::GetControlFileValues as u8);
|
||||
}
|
||||
}
|
||||
|
||||
bytes.into()
|
||||
@@ -1051,6 +1126,18 @@ impl PagestreamBeMessage {
|
||||
segment: segment.into(),
|
||||
})
|
||||
}
|
||||
Tag::GetVectoredPages => {
|
||||
let page_count = buf.read_u8()?;
|
||||
let mut pages = vec![0; page_count as usize * 8192];
|
||||
buf.read_exact(&mut pages)?;
|
||||
Self::GetVectoredPages(PagestreamGetVectoredPagesResponse {
|
||||
page_count,
|
||||
pages: pages.into(),
|
||||
})
|
||||
}
|
||||
Tag::GetControlFileValues => {
|
||||
Self::GetControlFileValues(PagestreamGetControlFileValuesResponse {})
|
||||
}
|
||||
};
|
||||
let remaining = buf.into_inner();
|
||||
if !remaining.is_empty() {
|
||||
@@ -1070,6 +1157,8 @@ impl PagestreamBeMessage {
|
||||
Self::Error(_) => "Error",
|
||||
Self::DbSize(_) => "DbSize",
|
||||
Self::GetSlruSegment(_) => "GetSlruSegment",
|
||||
Self::GetVectoredPages(_) => "GetVectoredPages",
|
||||
Self::GetControlFileValues(_) => "GetControlFileValues",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,7 +4,8 @@ use futures::SinkExt;
|
||||
use pageserver_api::{
|
||||
models::{
|
||||
PagestreamBeMessage, PagestreamFeMessage, PagestreamGetPageRequest,
|
||||
PagestreamGetPageResponse,
|
||||
PagestreamGetPageResponse, PagestreamGetVectoredPagesRequest,
|
||||
PagestreamGetVectoredPagesResponse, PagestreamGetControlFileValuesRequest, PagestreamGetControlFileValuesResponse,
|
||||
},
|
||||
reltag::RelTag,
|
||||
};
|
||||
@@ -157,7 +158,73 @@ impl PagestreamClient {
|
||||
PagestreamBeMessage::Exists(_)
|
||||
| PagestreamBeMessage::Nblocks(_)
|
||||
| PagestreamBeMessage::DbSize(_)
|
||||
| PagestreamBeMessage::GetSlruSegment(_) => {
|
||||
| PagestreamBeMessage::GetSlruSegment(_)
|
||||
| PagestreamBeMessage::GetControlFileValues(_)
|
||||
| PagestreamBeMessage::GetVectoredPages(_) => {
|
||||
anyhow::bail!(
|
||||
"unexpected be message kind in response to getpage request: {}",
|
||||
msg.kind()
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn getpages(
|
||||
&mut self,
|
||||
req: PagestreamGetVectoredPagesRequest,
|
||||
) -> anyhow::Result<PagestreamGetVectoredPagesResponse> {
|
||||
let req = PagestreamFeMessage::GetVectoredPages(req);
|
||||
let req: bytes::Bytes = req.serialize();
|
||||
// let mut req = tokio_util::io::ReaderStream::new(&req);
|
||||
let mut req = tokio_stream::once(Ok(req));
|
||||
|
||||
self.copy_both.send_all(&mut req).await?;
|
||||
|
||||
let next: Option<Result<bytes::Bytes, _>> = self.copy_both.next().await;
|
||||
let next: bytes::Bytes = next.unwrap()?;
|
||||
|
||||
let msg = PagestreamBeMessage::deserialize(next)?;
|
||||
match msg {
|
||||
PagestreamBeMessage::GetVectoredPages(p) => Ok(p),
|
||||
PagestreamBeMessage::Error(e) => anyhow::bail!("Error: {:?}", e),
|
||||
PagestreamBeMessage::Exists(_)
|
||||
| PagestreamBeMessage::Nblocks(_)
|
||||
| PagestreamBeMessage::DbSize(_)
|
||||
| PagestreamBeMessage::GetSlruSegment(_)
|
||||
| PagestreamBeMessage::GetControlFileValues(_)
|
||||
| PagestreamBeMessage::GetPage(_) => {
|
||||
anyhow::bail!(
|
||||
"unexpected be message kind in response to getpage request: {}",
|
||||
msg.kind()
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_control_file(
|
||||
&mut self,
|
||||
req: PagestreamGetControlFileValuesRequest,
|
||||
) -> anyhow::Result<PagestreamGetControlFileValuesResponse> {
|
||||
let req = PagestreamFeMessage::GetControlFileValues(req);
|
||||
let req: bytes::Bytes = req.serialize();
|
||||
// let mut req = tokio_util::io::ReaderStream::new(&req);
|
||||
let mut req = tokio_stream::once(Ok(req));
|
||||
|
||||
self.copy_both.send_all(&mut req).await?;
|
||||
|
||||
let next: Option<Result<bytes::Bytes, _>> = self.copy_both.next().await;
|
||||
let next: bytes::Bytes = next.unwrap()?;
|
||||
|
||||
let msg = PagestreamBeMessage::deserialize(next)?;
|
||||
match msg {
|
||||
PagestreamBeMessage::GetControlFileValues(p) => Ok(p),
|
||||
PagestreamBeMessage::Error(e) => anyhow::bail!("Error: {:?}", e),
|
||||
PagestreamBeMessage::Exists(_)
|
||||
| PagestreamBeMessage::Nblocks(_)
|
||||
| PagestreamBeMessage::DbSize(_)
|
||||
| PagestreamBeMessage::GetSlruSegment(_)
|
||||
| PagestreamBeMessage::GetVectoredPages(_)
|
||||
| PagestreamBeMessage::GetPage(_) => {
|
||||
anyhow::bail!(
|
||||
"unexpected be message kind in response to getpage request: {}",
|
||||
msg.kind()
|
||||
|
||||
@@ -2,7 +2,7 @@ use anyhow::Context;
|
||||
use camino::Utf8PathBuf;
|
||||
use pageserver_api::key::{is_rel_block_key, key_to_rel_block, Key};
|
||||
use pageserver_api::keyspace::KeySpaceAccum;
|
||||
use pageserver_api::models::PagestreamGetPageRequest;
|
||||
use pageserver_api::models::{PagestreamGetPageRequest, PagestreamGetControlFileValuesRequest};
|
||||
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::id::TenantTimelineId;
|
||||
@@ -57,6 +57,8 @@ pub(crate) struct Args {
|
||||
/// [`pageserver_api::models::virtual_file::IoEngineKind`].
|
||||
#[clap(long)]
|
||||
set_io_engine: Option<pageserver_api::models::virtual_file::IoEngineKind>,
|
||||
#[clap(long)]
|
||||
lsn: Option<u64>,
|
||||
targets: Option<Vec<TenantTimelineId>>,
|
||||
}
|
||||
|
||||
@@ -300,21 +302,14 @@ async fn main_impl(
|
||||
|
||||
let start = Instant::now();
|
||||
let req = {
|
||||
let mut rng = rand::thread_rng();
|
||||
let r = &ranges[weights.sample(&mut rng)];
|
||||
let key: i128 = rng.gen_range(r.start..r.end);
|
||||
let key = Key::from_i128(key);
|
||||
assert!(is_rel_block_key(&key));
|
||||
let (rel_tag, block_no) =
|
||||
key_to_rel_block(key).expect("we filter non-rel-block keys out above");
|
||||
PagestreamGetPageRequest {
|
||||
latest: rng.gen_bool(args.req_latest_probability),
|
||||
lsn: r.timeline_lsn,
|
||||
rel: rel_tag,
|
||||
blkno: block_no,
|
||||
PagestreamGetControlFileValuesRequest {
|
||||
lsn: Lsn(args.lsn.unwrap()),
|
||||
}
|
||||
};
|
||||
client.getpage(req).await.unwrap();
|
||||
|
||||
|
||||
client.get_control_file(req).await.unwrap();
|
||||
|
||||
let end = Instant::now();
|
||||
live_stats.request_done();
|
||||
ticks_processed += 1;
|
||||
|
||||
@@ -17,6 +17,13 @@ use futures::stream::FuturesUnordered;
|
||||
use futures::Stream;
|
||||
use futures::StreamExt;
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::key::AUX_FILES_KEY;
|
||||
use pageserver_api::key::CONTROLFILE_KEY;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::PagestreamGetControlFileValuesRequest;
|
||||
use pageserver_api::models::PagestreamGetControlFileValuesResponse;
|
||||
use pageserver_api::models::PagestreamGetVectoredPagesRequest;
|
||||
use pageserver_api::models::PagestreamGetVectoredPagesResponse;
|
||||
use pageserver_api::models::TenantState;
|
||||
use pageserver_api::models::{
|
||||
PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
|
||||
@@ -70,6 +77,7 @@ use crate::tenant::mgr;
|
||||
use crate::tenant::mgr::get_active_tenant_with_timeout;
|
||||
use crate::tenant::mgr::GetActiveTenantError;
|
||||
use crate::tenant::mgr::ShardSelector;
|
||||
use crate::tenant::timeline::GetVectoredError;
|
||||
use crate::tenant::timeline::WaitLsnError;
|
||||
use crate::tenant::GetTimelineError;
|
||||
use crate::tenant::PageReconstructError;
|
||||
@@ -333,6 +341,10 @@ enum PageStreamError {
|
||||
#[error("Read error")]
|
||||
Read(#[source] PageReconstructError),
|
||||
|
||||
/// Something went wrong reading a page: this likely indicates a pageserver bug
|
||||
#[error("Vectored read error")]
|
||||
VectoredRead(#[source] GetVectoredError),
|
||||
|
||||
/// Ran out of time waiting for an LSN
|
||||
#[error("LSN timeout: {0}")]
|
||||
LsnTimeout(WaitLsnError),
|
||||
@@ -356,6 +368,15 @@ impl From<PageReconstructError> for PageStreamError {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<GetVectoredError> for PageStreamError {
|
||||
fn from(value: GetVectoredError) -> Self {
|
||||
match value {
|
||||
GetVectoredError::Cancelled => Self::Shutdown,
|
||||
e => Self::VectoredRead(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<GetActiveTimelineError> for PageStreamError {
|
||||
fn from(value: GetActiveTimelineError) -> Self {
|
||||
match value {
|
||||
@@ -665,6 +686,24 @@ impl PageServerHandler {
|
||||
span,
|
||||
)
|
||||
}
|
||||
PagestreamFeMessage::GetVectoredPages(req) => {
|
||||
let span = tracing::info_span!("handle_get_vectored_pages_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn, req_count = %req.count);
|
||||
(
|
||||
self.handle_get_pages_at_lsn_request(tenant_id, timeline_id, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await,
|
||||
span,
|
||||
)
|
||||
}
|
||||
PagestreamFeMessage::GetControlFileValues(req) => {
|
||||
let span = tracing::info_span!("handle_get_control_values", req_lsn = %req.lsn);
|
||||
(
|
||||
self.handle_get_control_values(tenant_id, timeline_id, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await,
|
||||
span,
|
||||
)
|
||||
}
|
||||
};
|
||||
|
||||
match response {
|
||||
@@ -1159,6 +1198,98 @@ impl PageServerHandler {
|
||||
page,
|
||||
}))
|
||||
}
|
||||
#[instrument(skip_all, fields(shard_id))]
|
||||
async fn handle_get_control_values(
|
||||
&mut self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
req: &PagestreamGetControlFileValuesRequest,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<PagestreamBeMessage, PageStreamError> {
|
||||
let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?;
|
||||
let keyspace = KeySpace {
|
||||
ranges: vec![CONTROLFILE_KEY..AUX_FILES_KEY.next()],
|
||||
};
|
||||
let _ = timeline.get_vectored(keyspace, req.lsn, ctx).await;
|
||||
|
||||
Ok(PagestreamBeMessage::GetControlFileValues(
|
||||
PagestreamGetControlFileValuesResponse {},
|
||||
))
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(shard_id))]
|
||||
async fn handle_get_pages_at_lsn_request(
|
||||
&mut self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
req: &PagestreamGetVectoredPagesRequest,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<PagestreamBeMessage, PageStreamError> {
|
||||
// This is cheeky and relies on not using sharding :)
|
||||
// A real solution has to split the requested key sequence between shards.
|
||||
let get_page_request = PagestreamGetPageRequest {
|
||||
latest: req.latest,
|
||||
lsn: req.lsn,
|
||||
rel: req.rel,
|
||||
blkno: req.blkno,
|
||||
};
|
||||
|
||||
let timeline = match self.get_cached_timeline_for_page(&get_page_request) {
|
||||
Ok(tl) => tl,
|
||||
Err(key) => {
|
||||
match self
|
||||
.load_timeline_for_page(tenant_id, timeline_id, key)
|
||||
.await
|
||||
{
|
||||
Ok(t) => t,
|
||||
Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => {
|
||||
// We already know this tenant exists in general, because we resolved it at
|
||||
// start of connection. Getting a NotFound here indicates that the shard containing
|
||||
// the requested page is not present on this node: the client's knowledge of shard->pageserver
|
||||
// mapping is out of date.
|
||||
//
|
||||
// Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
|
||||
// client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
|
||||
// and talk to a different pageserver.
|
||||
return Err(PageStreamError::Reconnect(
|
||||
"getpage@lsn request routed to wrong shard".into(),
|
||||
));
|
||||
}
|
||||
Err(e) => return Err(e.into()),
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// load_timeline_for_page sets shard_id, but get_cached_timeline_for_page doesn't
|
||||
set_tracing_field_shard_id(timeline);
|
||||
|
||||
let _timer = timeline
|
||||
.query_metrics
|
||||
.start_timer(metrics::SmgrQueryType::GetPageAtLsn);
|
||||
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn =
|
||||
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
|
||||
.await?;
|
||||
|
||||
let (page_count, pages_buf) = timeline
|
||||
.get_rel_pages_at_lsn(
|
||||
req.rel,
|
||||
req.blkno,
|
||||
req.count,
|
||||
Version::Lsn(lsn),
|
||||
req.latest,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(PagestreamBeMessage::GetVectoredPages(
|
||||
PagestreamGetVectoredPagesResponse {
|
||||
page_count,
|
||||
pages: pages_buf,
|
||||
},
|
||||
))
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(shard_id))]
|
||||
async fn handle_get_slru_segment_request(
|
||||
|
||||
@@ -11,8 +11,9 @@ use crate::context::RequestContext;
|
||||
use crate::keyspace::{KeySpace, KeySpaceAccum};
|
||||
use crate::repository::*;
|
||||
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
|
||||
use crate::tenant::timeline::GetVectoredError;
|
||||
use crate::walrecord::NeonWalRecord;
|
||||
use anyhow::{ensure, Context};
|
||||
use anyhow::{anyhow, ensure, Context};
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
use enum_map::Enum;
|
||||
use pageserver_api::key::{
|
||||
@@ -26,7 +27,7 @@ use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
|
||||
use postgres_ffi::BLCKSZ;
|
||||
use postgres_ffi::{Oid, TimestampTz, TransactionId};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::{hash_map, HashMap, HashSet};
|
||||
use std::collections::{hash_map, BTreeMap, HashMap, HashSet};
|
||||
use std::ops::ControlFlow;
|
||||
use std::ops::Range;
|
||||
use strum::IntoEnumIterator;
|
||||
@@ -198,6 +199,41 @@ impl Timeline {
|
||||
version.get(self, key, ctx).await
|
||||
}
|
||||
|
||||
pub(crate) async fn get_rel_pages_at_lsn(
|
||||
&self,
|
||||
tag: RelTag,
|
||||
blknum: BlockNumber,
|
||||
count: u8,
|
||||
version: Version<'_>,
|
||||
latest: bool,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(u8, Bytes), GetVectoredError> {
|
||||
if tag.relnode == 0 {
|
||||
return Err(GetVectoredError::Other(
|
||||
RelationError::InvalidRelnode.into(),
|
||||
));
|
||||
}
|
||||
|
||||
let nblocks = self
|
||||
.get_rel_size(tag, version, latest, ctx)
|
||||
.await
|
||||
.map_err(|e| GetVectoredError::Other(anyhow!(e)))?;
|
||||
if blknum + (count - 1) as u32 >= nblocks {
|
||||
debug!(
|
||||
"read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
|
||||
tag,
|
||||
blknum,
|
||||
version.get_lsn(),
|
||||
nblocks
|
||||
);
|
||||
return Ok((1, ZERO_PAGE.clone()));
|
||||
}
|
||||
|
||||
let start_key = rel_block_to_key(tag, blknum);
|
||||
let end_key = start_key.add(count as u32);
|
||||
version.get_vectored(self, start_key..end_key, ctx).await
|
||||
}
|
||||
|
||||
// Get size of a database in blocks
|
||||
pub(crate) async fn get_db_size(
|
||||
&self,
|
||||
@@ -1604,6 +1640,55 @@ impl<'a> DatadirModification<'a> {
|
||||
self.tline.get(key, lsn, ctx).await
|
||||
}
|
||||
|
||||
async fn get_vectored(
|
||||
&self,
|
||||
key_range: Range<Key>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
|
||||
// Have we already updated the same key? Read the latest pending updated
|
||||
// version in that case.
|
||||
//
|
||||
// Note: we don't check pending_deletions. It is an error to request a
|
||||
// value that has been removed, deletion only avoids leaking storage.
|
||||
let mut results: BTreeMap<Key, Result<Bytes, PageReconstructError>> = BTreeMap::new();
|
||||
let mut keys_in_modification = KeySpaceAccum::new();
|
||||
|
||||
let key = key_range.start;
|
||||
while key != key_range.end {
|
||||
if let Some(values) = self.pending_updates.get(&key) {
|
||||
if let Some((_, value)) = values.last() {
|
||||
keys_in_modification.add_key(key);
|
||||
|
||||
match value {
|
||||
Value::Image(img) => {
|
||||
results.insert(key, Ok(img.clone()));
|
||||
}
|
||||
_ => {
|
||||
results.insert(
|
||||
key,
|
||||
Err(PageReconstructError::from(anyhow::anyhow!(
|
||||
"unexpected pending WAL record"
|
||||
))),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
|
||||
|
||||
let mut keyspace = KeySpace {
|
||||
ranges: vec![key_range],
|
||||
};
|
||||
keyspace.remove_overlapping_with(&keys_in_modification.to_keyspace());
|
||||
|
||||
let pages = self.tline.get_vectored(keyspace, lsn, ctx).await?;
|
||||
results.extend(pages.into_iter());
|
||||
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
fn put(&mut self, key: Key, val: Value) {
|
||||
let values = self.pending_updates.entry(key).or_default();
|
||||
// Replace the previous value if it exists at the same lsn
|
||||
@@ -1647,6 +1732,43 @@ impl<'a> Version<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_vectored(
|
||||
&self,
|
||||
timeline: &Timeline,
|
||||
key_range: Range<Key>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(u8, Bytes), GetVectoredError> {
|
||||
let pages = match self {
|
||||
Version::Lsn(lsn) => {
|
||||
timeline
|
||||
.get_vectored(
|
||||
KeySpace {
|
||||
ranges: vec![key_range],
|
||||
},
|
||||
*lsn,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
}
|
||||
Version::Modified(modification) => modification.get_vectored(key_range, ctx).await,
|
||||
}?;
|
||||
|
||||
let mut buf = BytesMut::new();
|
||||
let page_count: u8 = pages.len().try_into().expect("too many pages returned");
|
||||
for page in pages {
|
||||
match page {
|
||||
(_key, Ok(bytes)) => {
|
||||
buf.extend_from_slice(&bytes[..]);
|
||||
}
|
||||
(_key, Err(err)) => {
|
||||
return Err(GetVectoredError::Other(anyhow!(err)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok((page_count, buf.freeze()))
|
||||
}
|
||||
|
||||
fn get_lsn(&self) -> Lsn {
|
||||
match self {
|
||||
Version::Lsn(lsn) => *lsn,
|
||||
|
||||
@@ -18,10 +18,19 @@
|
||||
//! - An Iterator interface would be more convenient for the callers than the
|
||||
//! 'visit' function
|
||||
//!
|
||||
use async_stream::try_stream;
|
||||
use byteorder::{ReadBytesExt, BE};
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use either::Either;
|
||||
use std::{cmp::Ordering, io, result};
|
||||
use futures::Stream;
|
||||
use hex;
|
||||
use std::{
|
||||
cmp::Ordering,
|
||||
io,
|
||||
iter::Rev,
|
||||
ops::{Range, RangeInclusive},
|
||||
result,
|
||||
};
|
||||
use thiserror::Error;
|
||||
use tracing::error;
|
||||
|
||||
@@ -250,6 +259,90 @@ where
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Return a stream which yields all key, value pairs from the index
|
||||
/// starting from the first key greater or equal to `start_key`.
|
||||
///
|
||||
/// Note that this is a copy of [`Self::visit`].
|
||||
/// TODO: Once the sequential read path is removed this will become
|
||||
/// the only index traversal method.
|
||||
pub fn get_stream_from<'a>(
|
||||
&'a self,
|
||||
start_key: &'a [u8; L],
|
||||
ctx: &'a RequestContext,
|
||||
) -> impl Stream<Item = std::result::Result<(Vec<u8>, u64), DiskBtreeError>> + 'a {
|
||||
try_stream! {
|
||||
let mut stack = Vec::new();
|
||||
stack.push((self.root_blk, None));
|
||||
let block_cursor = self.reader.block_cursor();
|
||||
while let Some((node_blknum, opt_iter)) = stack.pop() {
|
||||
// Locate the node.
|
||||
let node_buf = block_cursor
|
||||
.read_blk(self.start_blk + node_blknum, ctx)
|
||||
.await?;
|
||||
|
||||
let node = OnDiskNode::deparse(node_buf.as_ref())?;
|
||||
let prefix_len = node.prefix_len as usize;
|
||||
let suffix_len = node.suffix_len as usize;
|
||||
|
||||
assert!(node.num_children > 0);
|
||||
|
||||
let mut keybuf = Vec::new();
|
||||
keybuf.extend(node.prefix);
|
||||
keybuf.resize(prefix_len + suffix_len, 0);
|
||||
|
||||
let mut iter: Either<Range<usize>, Rev<RangeInclusive<usize>>> = if let Some(iter) = opt_iter {
|
||||
iter
|
||||
} else {
|
||||
// Locate the first match
|
||||
let idx = match node.binary_search(start_key, keybuf.as_mut_slice()) {
|
||||
Ok(idx) => idx,
|
||||
Err(idx) => {
|
||||
if node.level == 0 {
|
||||
// Imagine that the node contains the following keys:
|
||||
//
|
||||
// 1
|
||||
// 3 <-- idx
|
||||
// 5
|
||||
//
|
||||
// If the search key is '2' and there is exact match,
|
||||
// the binary search would return the index of key
|
||||
// '3'. That's cool, '3' is the first key to return.
|
||||
idx
|
||||
} else {
|
||||
// This is an internal page, so each key represents a lower
|
||||
// bound for what's in the child page. If there is no exact
|
||||
// match, we have to return the *previous* entry.
|
||||
//
|
||||
// 1 <-- return this
|
||||
// 3 <-- idx
|
||||
// 5
|
||||
idx.saturating_sub(1)
|
||||
}
|
||||
}
|
||||
};
|
||||
Either::Left(idx..node.num_children.into())
|
||||
};
|
||||
|
||||
// idx points to the first match now. Keep going from there
|
||||
while let Some(idx) = iter.next() {
|
||||
let key_off = idx * suffix_len;
|
||||
let suffix = &node.keys[key_off..key_off + suffix_len];
|
||||
keybuf[prefix_len..].copy_from_slice(suffix);
|
||||
let value = node.value(idx);
|
||||
#[allow(clippy::collapsible_if)]
|
||||
if node.level == 0 {
|
||||
// leaf
|
||||
yield (keybuf.clone(), value.to_u64());
|
||||
} else {
|
||||
stack.push((node_blknum, Some(iter)));
|
||||
stack.push((value.to_blknum(), None));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Scan the tree, starting from 'search_key', in the given direction. 'visitor'
|
||||
/// will be called for every key >= 'search_key' (or <= 'search_key', if scanning
|
||||
|
||||
@@ -46,6 +46,8 @@ use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use bytes::BytesMut;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use futures::pin_mut;
|
||||
use futures::StreamExt;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::LayerAccessKind;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
@@ -847,10 +849,33 @@ impl DeltaLayerInner {
|
||||
reconstruct_state: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), GetVectoredError> {
|
||||
let reads = self
|
||||
.plan_reads(keyspace, lsn_range, reconstruct_state, ctx)
|
||||
.await
|
||||
.map_err(GetVectoredError::Other)?;
|
||||
let block_reader = FileBlockReader::new(&self.file, self.file_id);
|
||||
let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
|
||||
self.index_start_blk,
|
||||
self.index_root_blk,
|
||||
block_reader,
|
||||
);
|
||||
|
||||
let planner = VectoredReadPlanner::new(
|
||||
self.max_vectored_read_bytes
|
||||
.expect("Layer is loaded with max vectored bytes config")
|
||||
.0
|
||||
.into(),
|
||||
);
|
||||
|
||||
let data_end_offset = self.index_start_blk as u64 * PAGE_SZ as u64;
|
||||
|
||||
let reads = Self::plan_reads(
|
||||
keyspace,
|
||||
lsn_range,
|
||||
data_end_offset,
|
||||
index_reader,
|
||||
planner,
|
||||
reconstruct_state,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.map_err(GetVectoredError::Other)?;
|
||||
|
||||
self.do_reads_and_update_state(reads, reconstruct_state)
|
||||
.await;
|
||||
@@ -858,73 +883,68 @@ impl DeltaLayerInner {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn plan_reads(
|
||||
&self,
|
||||
// This is public only for testing purposes.
|
||||
pub(crate) async fn plan_reads<Reader>(
|
||||
keyspace: KeySpace,
|
||||
lsn_range: Range<Lsn>,
|
||||
data_end_offset: u64,
|
||||
index_reader: DiskBtreeReader<Reader, DELTA_KEY_SIZE>,
|
||||
mut planner: VectoredReadPlanner,
|
||||
reconstruct_state: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Vec<VectoredRead>> {
|
||||
let mut planner = VectoredReadPlanner::new(
|
||||
self.max_vectored_read_bytes
|
||||
.expect("Layer is loaded with max vectored bytes config")
|
||||
.0
|
||||
.into(),
|
||||
);
|
||||
|
||||
let block_reader = FileBlockReader::new(&self.file, self.file_id);
|
||||
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
|
||||
self.index_start_blk,
|
||||
self.index_root_blk,
|
||||
block_reader,
|
||||
);
|
||||
) -> anyhow::Result<Vec<VectoredRead>>
|
||||
where
|
||||
Reader: BlockReader,
|
||||
{
|
||||
let btree_request_context = RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::DeltaLayerBtreeNode)
|
||||
.build();
|
||||
|
||||
for range in keyspace.ranges.iter() {
|
||||
let mut range_end_handled = false;
|
||||
|
||||
let start_key = DeltaKey::from_key_lsn(&range.start, lsn_range.start);
|
||||
tree_reader
|
||||
.visit(
|
||||
&start_key.0,
|
||||
VisitDirection::Forwards,
|
||||
|raw_key, value| {
|
||||
let key = Key::from_slice(&raw_key[..KEY_SIZE]);
|
||||
let lsn = DeltaKey::extract_lsn_from_buf(raw_key);
|
||||
let blob_ref = BlobRef(value);
|
||||
let index_stream = index_reader.get_stream_from(&start_key.0, &btree_request_context);
|
||||
pin_mut!(index_stream);
|
||||
|
||||
assert!(key >= range.start && lsn >= lsn_range.start);
|
||||
while let Some(index_entry) = index_stream.next().await {
|
||||
let (raw_key, value) = index_entry.map_err(|err| anyhow!(err))?;
|
||||
let key = Key::from_slice(&raw_key[..KEY_SIZE]);
|
||||
let lsn = DeltaKey::extract_lsn_from_buf(&raw_key);
|
||||
let blob_ref = BlobRef(value);
|
||||
|
||||
let cached_lsn = reconstruct_state.get_cached_lsn(&key);
|
||||
let flag = {
|
||||
if cached_lsn >= Some(lsn) {
|
||||
BlobFlag::Ignore
|
||||
} else if blob_ref.will_init() {
|
||||
BlobFlag::Replaces
|
||||
} else {
|
||||
BlobFlag::None
|
||||
}
|
||||
};
|
||||
// Lsns are not monotonically increasing, so we don't assert on them.
|
||||
assert!(key >= range.start);
|
||||
|
||||
if key >= range.end || (key.next() == range.end && lsn >= lsn_range.end) {
|
||||
planner.handle_range_end(blob_ref.pos());
|
||||
range_end_handled = true;
|
||||
false
|
||||
} else {
|
||||
planner.handle(key, lsn, blob_ref.pos(), flag);
|
||||
true
|
||||
}
|
||||
},
|
||||
&RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::DeltaLayerBtreeNode)
|
||||
.build(),
|
||||
)
|
||||
.await
|
||||
.map_err(|err| anyhow!(err))?;
|
||||
let flag = {
|
||||
#[allow(clippy::if_same_then_else)]
|
||||
if lsn >= lsn_range.end || lsn < lsn_range.start {
|
||||
// If the Lsn is not in the queried range it must be ignored
|
||||
BlobFlag::Ignore
|
||||
} else if reconstruct_state.get_cached_lsn(&key) >= Some(lsn) {
|
||||
// If the Lsn is below the caching line it must be ignored
|
||||
BlobFlag::Ignore
|
||||
} else if blob_ref.will_init() {
|
||||
// This blob will replace all previous blobs for this key
|
||||
BlobFlag::Replaces
|
||||
} else {
|
||||
// Usual path: add blob to the read
|
||||
BlobFlag::None
|
||||
}
|
||||
};
|
||||
|
||||
if key >= range.end || (key.next() == range.end && lsn >= lsn_range.end) {
|
||||
planner.handle_range_end(blob_ref.pos());
|
||||
range_end_handled = true;
|
||||
break;
|
||||
} else {
|
||||
planner.handle(key, lsn, blob_ref.pos(), flag);
|
||||
}
|
||||
}
|
||||
|
||||
if !range_end_handled {
|
||||
let payload_end = self.index_start_blk as u64 * PAGE_SZ as u64;
|
||||
tracing::info!("Handling range end fallback at {}", payload_end);
|
||||
planner.handle_range_end(payload_end);
|
||||
tracing::info!("Handling range end fallback at {}", data_end_offset);
|
||||
planner.handle_range_end(data_end_offset);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1190,3 +1210,131 @@ impl<'a> pageserver_compaction::interface::CompactionDeltaEntry<'a, Key> for Del
|
||||
self.size
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use super::*;
|
||||
use crate::{
|
||||
context::DownloadBehavior, task_mgr::TaskKind, tenant::disk_btree::tests::TestDisk,
|
||||
};
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
struct BlobSpec {
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
at: u64,
|
||||
}
|
||||
|
||||
fn validate(
|
||||
keyspace: KeySpace,
|
||||
lsn_range: Range<Lsn>,
|
||||
vectored_reads: Vec<VectoredRead>,
|
||||
index_entries: BTreeMap<Key, Vec<Lsn>>,
|
||||
) {
|
||||
let mut planned_blobs = Vec::new();
|
||||
for read in vectored_reads {
|
||||
for (at, meta) in read.blobs_at.as_slice() {
|
||||
planned_blobs.push(BlobSpec {
|
||||
key: meta.key,
|
||||
lsn: meta.lsn,
|
||||
at: *at,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
let mut expected_blobs = Vec::new();
|
||||
let mut disk_offset = 0;
|
||||
for (key, lsns) in index_entries {
|
||||
for lsn in lsns {
|
||||
let key_included = keyspace.ranges.iter().any(|range| range.contains(&key));
|
||||
let lsn_included = lsn_range.contains(&lsn);
|
||||
|
||||
if key_included && lsn_included {
|
||||
expected_blobs.push(BlobSpec {
|
||||
key,
|
||||
lsn,
|
||||
at: disk_offset,
|
||||
});
|
||||
}
|
||||
|
||||
disk_offset += 1;
|
||||
}
|
||||
}
|
||||
|
||||
assert_eq!(planned_blobs, expected_blobs);
|
||||
}
|
||||
|
||||
/// Construct an index for a fictional delta layer and and then
|
||||
/// traverse in order to plan vectored reads for a query. Finally,
|
||||
/// verify that the traversal fed the right index key and value
|
||||
/// pairs into the planner.
|
||||
#[tokio::test]
|
||||
async fn test_delta_layer_index_traversal() {
|
||||
let base_key = Key {
|
||||
field1: 0,
|
||||
field2: 1663,
|
||||
field3: 12972,
|
||||
field4: 16396,
|
||||
field5: 0,
|
||||
field6: 246080,
|
||||
};
|
||||
|
||||
// Populate the index with some entries
|
||||
let entries: BTreeMap<Key, Vec<Lsn>> = BTreeMap::from([
|
||||
(base_key, vec![Lsn(1), Lsn(5), Lsn(25), Lsn(26), Lsn(28)]),
|
||||
(base_key.add(1), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]),
|
||||
(base_key.add(2), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]),
|
||||
(base_key.add(5), vec![Lsn(10), Lsn(15), Lsn(16), Lsn(20)]),
|
||||
]);
|
||||
|
||||
let mut disk = TestDisk::default();
|
||||
let mut writer = DiskBtreeBuilder::<_, DELTA_KEY_SIZE>::new(&mut disk);
|
||||
|
||||
let mut disk_offset = 0;
|
||||
for (key, lsns) in &entries {
|
||||
for lsn in lsns {
|
||||
let index_key = DeltaKey::from_key_lsn(key, *lsn);
|
||||
let blob_ref = BlobRef::new(disk_offset, false);
|
||||
writer
|
||||
.append(&index_key.0, blob_ref.0)
|
||||
.expect("In memory disk append should never fail");
|
||||
|
||||
disk_offset += 1;
|
||||
}
|
||||
}
|
||||
|
||||
// Prepare all the arguments for the call into `plan_reads` below
|
||||
let (root_offset, _writer) = writer
|
||||
.finish()
|
||||
.expect("In memory disk finish should never fail");
|
||||
let reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(0, root_offset, disk);
|
||||
let planner = VectoredReadPlanner::new(100);
|
||||
let mut reconstruct_state = ValuesReconstructState::new();
|
||||
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
|
||||
|
||||
let keyspace = KeySpace {
|
||||
ranges: vec![
|
||||
base_key..base_key.add(3),
|
||||
base_key.add(3)..base_key.add(100),
|
||||
],
|
||||
};
|
||||
let lsn_range = Lsn(2)..Lsn(40);
|
||||
|
||||
// Plan and validate
|
||||
let vectored_reads = DeltaLayerInner::plan_reads(
|
||||
keyspace.clone(),
|
||||
lsn_range.clone(),
|
||||
disk_offset,
|
||||
reader,
|
||||
planner,
|
||||
&mut reconstruct_state,
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.expect("Read planning should not fail");
|
||||
|
||||
validate(keyspace, lsn_range, vectored_reads, entries);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -43,6 +43,8 @@ use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use futures::pin_mut;
|
||||
use hex;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::LayerAccessKind;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
@@ -54,6 +56,7 @@ use std::ops::Range;
|
||||
use std::os::unix::prelude::FileExt;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::OnceCell;
|
||||
use tokio_stream::StreamExt;
|
||||
use tracing::*;
|
||||
|
||||
use utils::{
|
||||
@@ -488,35 +491,33 @@ impl ImageLayerInner {
|
||||
let tree_reader =
|
||||
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
|
||||
|
||||
let btree_request_context = RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::DeltaLayerBtreeNode)
|
||||
.build();
|
||||
|
||||
for range in keyspace.ranges.iter() {
|
||||
let mut range_end_handled = false;
|
||||
|
||||
let mut search_key: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
|
||||
range.start.write_to_byte_slice(&mut search_key);
|
||||
|
||||
tree_reader
|
||||
.visit(
|
||||
&search_key,
|
||||
VisitDirection::Forwards,
|
||||
|raw_key, offset| {
|
||||
let key = Key::from_slice(&raw_key[..KEY_SIZE]);
|
||||
assert!(key >= range.start);
|
||||
let index_stream = tree_reader.get_stream_from(&search_key, &btree_request_context);
|
||||
pin_mut!(index_stream);
|
||||
|
||||
if key >= range.end {
|
||||
planner.handle_range_end(offset);
|
||||
range_end_handled = true;
|
||||
false
|
||||
} else {
|
||||
planner.handle(key, self.lsn, offset, BlobFlag::None);
|
||||
true
|
||||
}
|
||||
},
|
||||
&RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::ImageLayerBtreeNode)
|
||||
.build(),
|
||||
)
|
||||
.await
|
||||
.map_err(|err| GetVectoredError::Other(anyhow!(err)))?;
|
||||
while let Some(index_entry) = index_stream.next().await {
|
||||
let (raw_key, offset) = index_entry.map_err(|err| anyhow!(err))?;
|
||||
|
||||
let key = Key::from_slice(&raw_key[..KEY_SIZE]);
|
||||
assert!(key >= range.start);
|
||||
|
||||
if key >= range.end {
|
||||
planner.handle_range_end(offset);
|
||||
range_end_handled = true;
|
||||
break;
|
||||
} else {
|
||||
planner.handle(key, self.lsn, offset, BlobFlag::None);
|
||||
}
|
||||
}
|
||||
|
||||
if !range_end_handled {
|
||||
let payload_end = self.index_start_blk as u64 * PAGE_SZ as u64;
|
||||
|
||||
@@ -74,6 +74,8 @@ fn analyze_trace<R: std::io::Read>(mut reader: R) {
|
||||
}
|
||||
prev = Some(req);
|
||||
}
|
||||
PagestreamFeMessage::GetVectoredPages(_) => {}
|
||||
PagestreamFeMessage::GetControlFileValues(_) => {}
|
||||
PagestreamFeMessage::DbSize(_) => {}
|
||||
};
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user