Compare commits

...

7 Commits

Author SHA1 Message Date
Vlad Lazar
64ad7658e3 pagebench 2024-03-04 16:10:23 +00:00
Vlad Lazar
cffe724b01 test: get control values req 2024-03-04 16:04:08 +00:00
Vlad Lazar
2d7432231f GetVectoredPages 2024-03-04 15:37:51 +00:00
Vlad Lazar
58f00b83c1 pageserver: unit test delta layer index traversal 2024-03-04 12:59:40 +00:00
Vlad Lazar
0870dafc32 pageserver: refactor delta and img index search to use stream 2024-03-04 12:59:37 +00:00
Vlad Lazar
024f2923a6 pageserver: add streamed btree index traversal 2024-03-04 12:59:00 +00:00
Vlad Lazar
7debd6162c pageserver: fix vectored delta layer index traversal
There were two issues with it.
1. The `key >= range.start && lsn >= lsn_range.start`
was too aggressive. Lsns are not monotonically increasing in the delta
layer index (keys are though), so we cannot assert on them.
2. Lsns greater or equal to `lsn_range.end` were not skipped. This
caused the query to consider records newer than the request Lsn.
2024-03-04 12:58:08 +00:00
9 changed files with 746 additions and 98 deletions

View File

@@ -738,6 +738,8 @@ pub enum PagestreamFeMessage {
GetPage(PagestreamGetPageRequest),
DbSize(PagestreamDbSizeRequest),
GetSlruSegment(PagestreamGetSlruSegmentRequest),
GetVectoredPages(PagestreamGetVectoredPagesRequest),
GetControlFileValues(PagestreamGetControlFileValuesRequest),
}
// Wrapped in libpq CopyData
@@ -749,6 +751,8 @@ pub enum PagestreamBeMessage {
Error(PagestreamErrorResponse),
DbSize(PagestreamDbSizeResponse),
GetSlruSegment(PagestreamGetSlruSegmentResponse),
GetVectoredPages(PagestreamGetVectoredPagesResponse),
GetControlFileValues(PagestreamGetControlFileValuesResponse),
}
// Keep in sync with `pagestore_client.h`
@@ -760,6 +764,8 @@ enum PagestreamBeMessageTag {
Error = 103,
DbSize = 104,
GetSlruSegment = 105,
GetVectoredPages = 106,
GetControlFileValues = 107,
}
impl TryFrom<u8> for PagestreamBeMessageTag {
type Error = u8;
@@ -771,6 +777,7 @@ impl TryFrom<u8> for PagestreamBeMessageTag {
103 => Ok(PagestreamBeMessageTag::Error),
104 => Ok(PagestreamBeMessageTag::DbSize),
105 => Ok(PagestreamBeMessageTag::GetSlruSegment),
106 => Ok(PagestreamBeMessageTag::GetVectoredPages),
_ => Err(value),
}
}
@@ -813,6 +820,20 @@ pub struct PagestreamGetSlruSegmentRequest {
pub segno: u32,
}
#[derive(Debug, PartialEq, Eq)]
pub struct PagestreamGetVectoredPagesRequest {
pub latest: bool,
pub lsn: Lsn,
pub rel: RelTag,
pub blkno: u32,
pub count: u8,
}
#[derive(Debug, PartialEq, Eq)]
pub struct PagestreamGetControlFileValuesRequest {
pub lsn: Lsn,
}
#[derive(Debug)]
pub struct PagestreamExistsResponse {
pub exists: bool,
@@ -833,6 +854,15 @@ pub struct PagestreamGetSlruSegmentResponse {
pub segment: Bytes,
}
#[derive(Debug)]
pub struct PagestreamGetVectoredPagesResponse {
pub page_count: u8,
pub pages: Bytes,
}
#[derive(Debug)]
pub struct PagestreamGetControlFileValuesResponse {}
#[derive(Debug)]
pub struct PagestreamErrorResponse {
pub message: String,
@@ -904,6 +934,22 @@ impl PagestreamFeMessage {
bytes.put_u8(req.kind);
bytes.put_u32(req.segno);
}
Self::GetVectoredPages(req) => {
bytes.put_u8(5);
bytes.put_u8(u8::from(req.latest));
bytes.put_u64(req.lsn.0);
bytes.put_u32(req.rel.spcnode);
bytes.put_u32(req.rel.dbnode);
bytes.put_u32(req.rel.relnode);
bytes.put_u8(req.rel.forknum);
bytes.put_u32(req.blkno);
bytes.put_u8(req.count);
}
Self::GetControlFileValues(req) => {
bytes.put_u8(6);
bytes.put_u64(req.lsn.0);
}
}
bytes.into()
@@ -962,6 +1008,25 @@ impl PagestreamFeMessage {
segno: body.read_u32::<BigEndian>()?,
},
)),
5 => Ok(PagestreamFeMessage::GetVectoredPages(
PagestreamGetVectoredPagesRequest {
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
},
blkno: body.read_u32::<BigEndian>()?,
count: body.read_u8()?,
},
)),
6 => Ok(PagestreamFeMessage::GetControlFileValues(
PagestreamGetControlFileValuesRequest {
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
},
)),
_ => bail!("unknown smgr message tag: {:?}", msg_tag),
}
}
@@ -1003,6 +1068,16 @@ impl PagestreamBeMessage {
bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32);
bytes.put(&resp.segment[..]);
}
Self::GetVectoredPages(resp) => {
bytes.put_u8(Tag::GetVectoredPages as u8);
bytes.put_u8(resp.page_count);
bytes.put(&resp.pages[..]);
}
Self::GetControlFileValues(_resp) => {
bytes.put_u8(Tag::GetControlFileValues as u8);
}
}
bytes.into()
@@ -1051,6 +1126,18 @@ impl PagestreamBeMessage {
segment: segment.into(),
})
}
Tag::GetVectoredPages => {
let page_count = buf.read_u8()?;
let mut pages = vec![0; page_count as usize * 8192];
buf.read_exact(&mut pages)?;
Self::GetVectoredPages(PagestreamGetVectoredPagesResponse {
page_count,
pages: pages.into(),
})
}
Tag::GetControlFileValues => {
Self::GetControlFileValues(PagestreamGetControlFileValuesResponse {})
}
};
let remaining = buf.into_inner();
if !remaining.is_empty() {
@@ -1070,6 +1157,8 @@ impl PagestreamBeMessage {
Self::Error(_) => "Error",
Self::DbSize(_) => "DbSize",
Self::GetSlruSegment(_) => "GetSlruSegment",
Self::GetVectoredPages(_) => "GetVectoredPages",
Self::GetControlFileValues(_) => "GetControlFileValues",
}
}
}

View File

@@ -4,7 +4,8 @@ use futures::SinkExt;
use pageserver_api::{
models::{
PagestreamBeMessage, PagestreamFeMessage, PagestreamGetPageRequest,
PagestreamGetPageResponse,
PagestreamGetPageResponse, PagestreamGetVectoredPagesRequest,
PagestreamGetVectoredPagesResponse, PagestreamGetControlFileValuesRequest, PagestreamGetControlFileValuesResponse,
},
reltag::RelTag,
};
@@ -157,7 +158,73 @@ impl PagestreamClient {
PagestreamBeMessage::Exists(_)
| PagestreamBeMessage::Nblocks(_)
| PagestreamBeMessage::DbSize(_)
| PagestreamBeMessage::GetSlruSegment(_) => {
| PagestreamBeMessage::GetSlruSegment(_)
| PagestreamBeMessage::GetControlFileValues(_)
| PagestreamBeMessage::GetVectoredPages(_) => {
anyhow::bail!(
"unexpected be message kind in response to getpage request: {}",
msg.kind()
)
}
}
}
pub async fn getpages(
&mut self,
req: PagestreamGetVectoredPagesRequest,
) -> anyhow::Result<PagestreamGetVectoredPagesResponse> {
let req = PagestreamFeMessage::GetVectoredPages(req);
let req: bytes::Bytes = req.serialize();
// let mut req = tokio_util::io::ReaderStream::new(&req);
let mut req = tokio_stream::once(Ok(req));
self.copy_both.send_all(&mut req).await?;
let next: Option<Result<bytes::Bytes, _>> = self.copy_both.next().await;
let next: bytes::Bytes = next.unwrap()?;
let msg = PagestreamBeMessage::deserialize(next)?;
match msg {
PagestreamBeMessage::GetVectoredPages(p) => Ok(p),
PagestreamBeMessage::Error(e) => anyhow::bail!("Error: {:?}", e),
PagestreamBeMessage::Exists(_)
| PagestreamBeMessage::Nblocks(_)
| PagestreamBeMessage::DbSize(_)
| PagestreamBeMessage::GetSlruSegment(_)
| PagestreamBeMessage::GetControlFileValues(_)
| PagestreamBeMessage::GetPage(_) => {
anyhow::bail!(
"unexpected be message kind in response to getpage request: {}",
msg.kind()
)
}
}
}
pub async fn get_control_file(
&mut self,
req: PagestreamGetControlFileValuesRequest,
) -> anyhow::Result<PagestreamGetControlFileValuesResponse> {
let req = PagestreamFeMessage::GetControlFileValues(req);
let req: bytes::Bytes = req.serialize();
// let mut req = tokio_util::io::ReaderStream::new(&req);
let mut req = tokio_stream::once(Ok(req));
self.copy_both.send_all(&mut req).await?;
let next: Option<Result<bytes::Bytes, _>> = self.copy_both.next().await;
let next: bytes::Bytes = next.unwrap()?;
let msg = PagestreamBeMessage::deserialize(next)?;
match msg {
PagestreamBeMessage::GetControlFileValues(p) => Ok(p),
PagestreamBeMessage::Error(e) => anyhow::bail!("Error: {:?}", e),
PagestreamBeMessage::Exists(_)
| PagestreamBeMessage::Nblocks(_)
| PagestreamBeMessage::DbSize(_)
| PagestreamBeMessage::GetSlruSegment(_)
| PagestreamBeMessage::GetVectoredPages(_)
| PagestreamBeMessage::GetPage(_) => {
anyhow::bail!(
"unexpected be message kind in response to getpage request: {}",
msg.kind()

View File

@@ -2,7 +2,7 @@ use anyhow::Context;
use camino::Utf8PathBuf;
use pageserver_api::key::{is_rel_block_key, key_to_rel_block, Key};
use pageserver_api::keyspace::KeySpaceAccum;
use pageserver_api::models::PagestreamGetPageRequest;
use pageserver_api::models::{PagestreamGetPageRequest, PagestreamGetControlFileValuesRequest};
use tokio_util::sync::CancellationToken;
use utils::id::TenantTimelineId;
@@ -57,6 +57,8 @@ pub(crate) struct Args {
/// [`pageserver_api::models::virtual_file::IoEngineKind`].
#[clap(long)]
set_io_engine: Option<pageserver_api::models::virtual_file::IoEngineKind>,
#[clap(long)]
lsn: Option<u64>,
targets: Option<Vec<TenantTimelineId>>,
}
@@ -300,21 +302,14 @@ async fn main_impl(
let start = Instant::now();
let req = {
let mut rng = rand::thread_rng();
let r = &ranges[weights.sample(&mut rng)];
let key: i128 = rng.gen_range(r.start..r.end);
let key = Key::from_i128(key);
assert!(is_rel_block_key(&key));
let (rel_tag, block_no) =
key_to_rel_block(key).expect("we filter non-rel-block keys out above");
PagestreamGetPageRequest {
latest: rng.gen_bool(args.req_latest_probability),
lsn: r.timeline_lsn,
rel: rel_tag,
blkno: block_no,
PagestreamGetControlFileValuesRequest {
lsn: Lsn(args.lsn.unwrap()),
}
};
client.getpage(req).await.unwrap();
client.get_control_file(req).await.unwrap();
let end = Instant::now();
live_stats.request_done();
ticks_processed += 1;

View File

@@ -17,6 +17,13 @@ use futures::stream::FuturesUnordered;
use futures::Stream;
use futures::StreamExt;
use pageserver_api::key::Key;
use pageserver_api::key::AUX_FILES_KEY;
use pageserver_api::key::CONTROLFILE_KEY;
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::PagestreamGetControlFileValuesRequest;
use pageserver_api::models::PagestreamGetControlFileValuesResponse;
use pageserver_api::models::PagestreamGetVectoredPagesRequest;
use pageserver_api::models::PagestreamGetVectoredPagesResponse;
use pageserver_api::models::TenantState;
use pageserver_api::models::{
PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
@@ -70,6 +77,7 @@ use crate::tenant::mgr;
use crate::tenant::mgr::get_active_tenant_with_timeout;
use crate::tenant::mgr::GetActiveTenantError;
use crate::tenant::mgr::ShardSelector;
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::timeline::WaitLsnError;
use crate::tenant::GetTimelineError;
use crate::tenant::PageReconstructError;
@@ -333,6 +341,10 @@ enum PageStreamError {
#[error("Read error")]
Read(#[source] PageReconstructError),
/// Something went wrong reading a page: this likely indicates a pageserver bug
#[error("Vectored read error")]
VectoredRead(#[source] GetVectoredError),
/// Ran out of time waiting for an LSN
#[error("LSN timeout: {0}")]
LsnTimeout(WaitLsnError),
@@ -356,6 +368,15 @@ impl From<PageReconstructError> for PageStreamError {
}
}
impl From<GetVectoredError> for PageStreamError {
fn from(value: GetVectoredError) -> Self {
match value {
GetVectoredError::Cancelled => Self::Shutdown,
e => Self::VectoredRead(e),
}
}
}
impl From<GetActiveTimelineError> for PageStreamError {
fn from(value: GetActiveTimelineError) -> Self {
match value {
@@ -665,6 +686,24 @@ impl PageServerHandler {
span,
)
}
PagestreamFeMessage::GetVectoredPages(req) => {
let span = tracing::info_span!("handle_get_vectored_pages_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn, req_count = %req.count);
(
self.handle_get_pages_at_lsn_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
.await,
span,
)
}
PagestreamFeMessage::GetControlFileValues(req) => {
let span = tracing::info_span!("handle_get_control_values", req_lsn = %req.lsn);
(
self.handle_get_control_values(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
.await,
span,
)
}
};
match response {
@@ -1159,6 +1198,98 @@ impl PageServerHandler {
page,
}))
}
#[instrument(skip_all, fields(shard_id))]
async fn handle_get_control_values(
&mut self,
tenant_id: TenantId,
timeline_id: TimelineId,
req: &PagestreamGetControlFileValuesRequest,
ctx: &RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?;
let keyspace = KeySpace {
ranges: vec![CONTROLFILE_KEY..AUX_FILES_KEY.next()],
};
let _ = timeline.get_vectored(keyspace, req.lsn, ctx).await;
Ok(PagestreamBeMessage::GetControlFileValues(
PagestreamGetControlFileValuesResponse {},
))
}
#[instrument(skip_all, fields(shard_id))]
async fn handle_get_pages_at_lsn_request(
&mut self,
tenant_id: TenantId,
timeline_id: TimelineId,
req: &PagestreamGetVectoredPagesRequest,
ctx: &RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
// This is cheeky and relies on not using sharding :)
// A real solution has to split the requested key sequence between shards.
let get_page_request = PagestreamGetPageRequest {
latest: req.latest,
lsn: req.lsn,
rel: req.rel,
blkno: req.blkno,
};
let timeline = match self.get_cached_timeline_for_page(&get_page_request) {
Ok(tl) => tl,
Err(key) => {
match self
.load_timeline_for_page(tenant_id, timeline_id, key)
.await
{
Ok(t) => t,
Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => {
// We already know this tenant exists in general, because we resolved it at
// start of connection. Getting a NotFound here indicates that the shard containing
// the requested page is not present on this node: the client's knowledge of shard->pageserver
// mapping is out of date.
//
// Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
// client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
// and talk to a different pageserver.
return Err(PageStreamError::Reconnect(
"getpage@lsn request routed to wrong shard".into(),
));
}
Err(e) => return Err(e.into()),
}
}
};
// load_timeline_for_page sets shard_id, but get_cached_timeline_for_page doesn't
set_tracing_field_shard_id(timeline);
let _timer = timeline
.query_metrics
.start_timer(metrics::SmgrQueryType::GetPageAtLsn);
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn =
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
.await?;
let (page_count, pages_buf) = timeline
.get_rel_pages_at_lsn(
req.rel,
req.blkno,
req.count,
Version::Lsn(lsn),
req.latest,
ctx,
)
.await?;
Ok(PagestreamBeMessage::GetVectoredPages(
PagestreamGetVectoredPagesResponse {
page_count,
pages: pages_buf,
},
))
}
#[instrument(skip_all, fields(shard_id))]
async fn handle_get_slru_segment_request(

View File

@@ -11,8 +11,9 @@ use crate::context::RequestContext;
use crate::keyspace::{KeySpace, KeySpaceAccum};
use crate::repository::*;
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
use crate::tenant::timeline::GetVectoredError;
use crate::walrecord::NeonWalRecord;
use anyhow::{ensure, Context};
use anyhow::{anyhow, ensure, Context};
use bytes::{Buf, Bytes, BytesMut};
use enum_map::Enum;
use pageserver_api::key::{
@@ -26,7 +27,7 @@ use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi::BLCKSZ;
use postgres_ffi::{Oid, TimestampTz, TransactionId};
use serde::{Deserialize, Serialize};
use std::collections::{hash_map, HashMap, HashSet};
use std::collections::{hash_map, BTreeMap, HashMap, HashSet};
use std::ops::ControlFlow;
use std::ops::Range;
use strum::IntoEnumIterator;
@@ -198,6 +199,41 @@ impl Timeline {
version.get(self, key, ctx).await
}
pub(crate) async fn get_rel_pages_at_lsn(
&self,
tag: RelTag,
blknum: BlockNumber,
count: u8,
version: Version<'_>,
latest: bool,
ctx: &RequestContext,
) -> Result<(u8, Bytes), GetVectoredError> {
if tag.relnode == 0 {
return Err(GetVectoredError::Other(
RelationError::InvalidRelnode.into(),
));
}
let nblocks = self
.get_rel_size(tag, version, latest, ctx)
.await
.map_err(|e| GetVectoredError::Other(anyhow!(e)))?;
if blknum + (count - 1) as u32 >= nblocks {
debug!(
"read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
tag,
blknum,
version.get_lsn(),
nblocks
);
return Ok((1, ZERO_PAGE.clone()));
}
let start_key = rel_block_to_key(tag, blknum);
let end_key = start_key.add(count as u32);
version.get_vectored(self, start_key..end_key, ctx).await
}
// Get size of a database in blocks
pub(crate) async fn get_db_size(
&self,
@@ -1604,6 +1640,55 @@ impl<'a> DatadirModification<'a> {
self.tline.get(key, lsn, ctx).await
}
async fn get_vectored(
&self,
key_range: Range<Key>,
ctx: &RequestContext,
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
// Have we already updated the same key? Read the latest pending updated
// version in that case.
//
// Note: we don't check pending_deletions. It is an error to request a
// value that has been removed, deletion only avoids leaking storage.
let mut results: BTreeMap<Key, Result<Bytes, PageReconstructError>> = BTreeMap::new();
let mut keys_in_modification = KeySpaceAccum::new();
let key = key_range.start;
while key != key_range.end {
if let Some(values) = self.pending_updates.get(&key) {
if let Some((_, value)) = values.last() {
keys_in_modification.add_key(key);
match value {
Value::Image(img) => {
results.insert(key, Ok(img.clone()));
}
_ => {
results.insert(
key,
Err(PageReconstructError::from(anyhow::anyhow!(
"unexpected pending WAL record"
))),
);
}
}
}
}
}
let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
let mut keyspace = KeySpace {
ranges: vec![key_range],
};
keyspace.remove_overlapping_with(&keys_in_modification.to_keyspace());
let pages = self.tline.get_vectored(keyspace, lsn, ctx).await?;
results.extend(pages.into_iter());
Ok(results)
}
fn put(&mut self, key: Key, val: Value) {
let values = self.pending_updates.entry(key).or_default();
// Replace the previous value if it exists at the same lsn
@@ -1647,6 +1732,43 @@ impl<'a> Version<'a> {
}
}
async fn get_vectored(
&self,
timeline: &Timeline,
key_range: Range<Key>,
ctx: &RequestContext,
) -> Result<(u8, Bytes), GetVectoredError> {
let pages = match self {
Version::Lsn(lsn) => {
timeline
.get_vectored(
KeySpace {
ranges: vec![key_range],
},
*lsn,
ctx,
)
.await
}
Version::Modified(modification) => modification.get_vectored(key_range, ctx).await,
}?;
let mut buf = BytesMut::new();
let page_count: u8 = pages.len().try_into().expect("too many pages returned");
for page in pages {
match page {
(_key, Ok(bytes)) => {
buf.extend_from_slice(&bytes[..]);
}
(_key, Err(err)) => {
return Err(GetVectoredError::Other(anyhow!(err)));
}
}
}
Ok((page_count, buf.freeze()))
}
fn get_lsn(&self) -> Lsn {
match self {
Version::Lsn(lsn) => *lsn,

View File

@@ -18,10 +18,19 @@
//! - An Iterator interface would be more convenient for the callers than the
//! 'visit' function
//!
use async_stream::try_stream;
use byteorder::{ReadBytesExt, BE};
use bytes::{BufMut, Bytes, BytesMut};
use either::Either;
use std::{cmp::Ordering, io, result};
use futures::Stream;
use hex;
use std::{
cmp::Ordering,
io,
iter::Rev,
ops::{Range, RangeInclusive},
result,
};
use thiserror::Error;
use tracing::error;
@@ -250,6 +259,90 @@ where
Ok(result)
}
/// Return a stream which yields all key, value pairs from the index
/// starting from the first key greater or equal to `start_key`.
///
/// Note that this is a copy of [`Self::visit`].
/// TODO: Once the sequential read path is removed this will become
/// the only index traversal method.
pub fn get_stream_from<'a>(
&'a self,
start_key: &'a [u8; L],
ctx: &'a RequestContext,
) -> impl Stream<Item = std::result::Result<(Vec<u8>, u64), DiskBtreeError>> + 'a {
try_stream! {
let mut stack = Vec::new();
stack.push((self.root_blk, None));
let block_cursor = self.reader.block_cursor();
while let Some((node_blknum, opt_iter)) = stack.pop() {
// Locate the node.
let node_buf = block_cursor
.read_blk(self.start_blk + node_blknum, ctx)
.await?;
let node = OnDiskNode::deparse(node_buf.as_ref())?;
let prefix_len = node.prefix_len as usize;
let suffix_len = node.suffix_len as usize;
assert!(node.num_children > 0);
let mut keybuf = Vec::new();
keybuf.extend(node.prefix);
keybuf.resize(prefix_len + suffix_len, 0);
let mut iter: Either<Range<usize>, Rev<RangeInclusive<usize>>> = if let Some(iter) = opt_iter {
iter
} else {
// Locate the first match
let idx = match node.binary_search(start_key, keybuf.as_mut_slice()) {
Ok(idx) => idx,
Err(idx) => {
if node.level == 0 {
// Imagine that the node contains the following keys:
//
// 1
// 3 <-- idx
// 5
//
// If the search key is '2' and there is exact match,
// the binary search would return the index of key
// '3'. That's cool, '3' is the first key to return.
idx
} else {
// This is an internal page, so each key represents a lower
// bound for what's in the child page. If there is no exact
// match, we have to return the *previous* entry.
//
// 1 <-- return this
// 3 <-- idx
// 5
idx.saturating_sub(1)
}
}
};
Either::Left(idx..node.num_children.into())
};
// idx points to the first match now. Keep going from there
while let Some(idx) = iter.next() {
let key_off = idx * suffix_len;
let suffix = &node.keys[key_off..key_off + suffix_len];
keybuf[prefix_len..].copy_from_slice(suffix);
let value = node.value(idx);
#[allow(clippy::collapsible_if)]
if node.level == 0 {
// leaf
yield (keybuf.clone(), value.to_u64());
} else {
stack.push((node_blknum, Some(iter)));
stack.push((value.to_blknum(), None));
break;
}
}
}
}
}
///
/// Scan the tree, starting from 'search_key', in the given direction. 'visitor'
/// will be called for every key >= 'search_key' (or <= 'search_key', if scanning

View File

@@ -46,6 +46,8 @@ use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::BytesMut;
use camino::{Utf8Path, Utf8PathBuf};
use futures::pin_mut;
use futures::StreamExt;
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::LayerAccessKind;
use pageserver_api::shard::TenantShardId;
@@ -847,10 +849,33 @@ impl DeltaLayerInner {
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<(), GetVectoredError> {
let reads = self
.plan_reads(keyspace, lsn_range, reconstruct_state, ctx)
.await
.map_err(GetVectoredError::Other)?;
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
self.index_start_blk,
self.index_root_blk,
block_reader,
);
let planner = VectoredReadPlanner::new(
self.max_vectored_read_bytes
.expect("Layer is loaded with max vectored bytes config")
.0
.into(),
);
let data_end_offset = self.index_start_blk as u64 * PAGE_SZ as u64;
let reads = Self::plan_reads(
keyspace,
lsn_range,
data_end_offset,
index_reader,
planner,
reconstruct_state,
ctx,
)
.await
.map_err(GetVectoredError::Other)?;
self.do_reads_and_update_state(reads, reconstruct_state)
.await;
@@ -858,73 +883,68 @@ impl DeltaLayerInner {
Ok(())
}
async fn plan_reads(
&self,
// This is public only for testing purposes.
pub(crate) async fn plan_reads<Reader>(
keyspace: KeySpace,
lsn_range: Range<Lsn>,
data_end_offset: u64,
index_reader: DiskBtreeReader<Reader, DELTA_KEY_SIZE>,
mut planner: VectoredReadPlanner,
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<Vec<VectoredRead>> {
let mut planner = VectoredReadPlanner::new(
self.max_vectored_read_bytes
.expect("Layer is loaded with max vectored bytes config")
.0
.into(),
);
let block_reader = FileBlockReader::new(&self.file, self.file_id);
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
self.index_start_blk,
self.index_root_blk,
block_reader,
);
) -> anyhow::Result<Vec<VectoredRead>>
where
Reader: BlockReader,
{
let btree_request_context = RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::DeltaLayerBtreeNode)
.build();
for range in keyspace.ranges.iter() {
let mut range_end_handled = false;
let start_key = DeltaKey::from_key_lsn(&range.start, lsn_range.start);
tree_reader
.visit(
&start_key.0,
VisitDirection::Forwards,
|raw_key, value| {
let key = Key::from_slice(&raw_key[..KEY_SIZE]);
let lsn = DeltaKey::extract_lsn_from_buf(raw_key);
let blob_ref = BlobRef(value);
let index_stream = index_reader.get_stream_from(&start_key.0, &btree_request_context);
pin_mut!(index_stream);
assert!(key >= range.start && lsn >= lsn_range.start);
while let Some(index_entry) = index_stream.next().await {
let (raw_key, value) = index_entry.map_err(|err| anyhow!(err))?;
let key = Key::from_slice(&raw_key[..KEY_SIZE]);
let lsn = DeltaKey::extract_lsn_from_buf(&raw_key);
let blob_ref = BlobRef(value);
let cached_lsn = reconstruct_state.get_cached_lsn(&key);
let flag = {
if cached_lsn >= Some(lsn) {
BlobFlag::Ignore
} else if blob_ref.will_init() {
BlobFlag::Replaces
} else {
BlobFlag::None
}
};
// Lsns are not monotonically increasing, so we don't assert on them.
assert!(key >= range.start);
if key >= range.end || (key.next() == range.end && lsn >= lsn_range.end) {
planner.handle_range_end(blob_ref.pos());
range_end_handled = true;
false
} else {
planner.handle(key, lsn, blob_ref.pos(), flag);
true
}
},
&RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::DeltaLayerBtreeNode)
.build(),
)
.await
.map_err(|err| anyhow!(err))?;
let flag = {
#[allow(clippy::if_same_then_else)]
if lsn >= lsn_range.end || lsn < lsn_range.start {
// If the Lsn is not in the queried range it must be ignored
BlobFlag::Ignore
} else if reconstruct_state.get_cached_lsn(&key) >= Some(lsn) {
// If the Lsn is below the caching line it must be ignored
BlobFlag::Ignore
} else if blob_ref.will_init() {
// This blob will replace all previous blobs for this key
BlobFlag::Replaces
} else {
// Usual path: add blob to the read
BlobFlag::None
}
};
if key >= range.end || (key.next() == range.end && lsn >= lsn_range.end) {
planner.handle_range_end(blob_ref.pos());
range_end_handled = true;
break;
} else {
planner.handle(key, lsn, blob_ref.pos(), flag);
}
}
if !range_end_handled {
let payload_end = self.index_start_blk as u64 * PAGE_SZ as u64;
tracing::info!("Handling range end fallback at {}", payload_end);
planner.handle_range_end(payload_end);
tracing::info!("Handling range end fallback at {}", data_end_offset);
planner.handle_range_end(data_end_offset);
}
}
@@ -1190,3 +1210,131 @@ impl<'a> pageserver_compaction::interface::CompactionDeltaEntry<'a, Key> for Del
self.size
}
}
#[cfg(test)]
mod test {
use std::collections::BTreeMap;
use super::*;
use crate::{
context::DownloadBehavior, task_mgr::TaskKind, tenant::disk_btree::tests::TestDisk,
};
#[derive(Debug, PartialEq, Eq)]
struct BlobSpec {
key: Key,
lsn: Lsn,
at: u64,
}
fn validate(
keyspace: KeySpace,
lsn_range: Range<Lsn>,
vectored_reads: Vec<VectoredRead>,
index_entries: BTreeMap<Key, Vec<Lsn>>,
) {
let mut planned_blobs = Vec::new();
for read in vectored_reads {
for (at, meta) in read.blobs_at.as_slice() {
planned_blobs.push(BlobSpec {
key: meta.key,
lsn: meta.lsn,
at: *at,
});
}
}
let mut expected_blobs = Vec::new();
let mut disk_offset = 0;
for (key, lsns) in index_entries {
for lsn in lsns {
let key_included = keyspace.ranges.iter().any(|range| range.contains(&key));
let lsn_included = lsn_range.contains(&lsn);
if key_included && lsn_included {
expected_blobs.push(BlobSpec {
key,
lsn,
at: disk_offset,
});
}
disk_offset += 1;
}
}
assert_eq!(planned_blobs, expected_blobs);
}
/// Construct an index for a fictional delta layer and and then
/// traverse in order to plan vectored reads for a query. Finally,
/// verify that the traversal fed the right index key and value
/// pairs into the planner.
#[tokio::test]
async fn test_delta_layer_index_traversal() {
let base_key = Key {
field1: 0,
field2: 1663,
field3: 12972,
field4: 16396,
field5: 0,
field6: 246080,
};
// Populate the index with some entries
let entries: BTreeMap<Key, Vec<Lsn>> = BTreeMap::from([
(base_key, vec![Lsn(1), Lsn(5), Lsn(25), Lsn(26), Lsn(28)]),
(base_key.add(1), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]),
(base_key.add(2), vec![Lsn(2), Lsn(5), Lsn(10), Lsn(50)]),
(base_key.add(5), vec![Lsn(10), Lsn(15), Lsn(16), Lsn(20)]),
]);
let mut disk = TestDisk::default();
let mut writer = DiskBtreeBuilder::<_, DELTA_KEY_SIZE>::new(&mut disk);
let mut disk_offset = 0;
for (key, lsns) in &entries {
for lsn in lsns {
let index_key = DeltaKey::from_key_lsn(key, *lsn);
let blob_ref = BlobRef::new(disk_offset, false);
writer
.append(&index_key.0, blob_ref.0)
.expect("In memory disk append should never fail");
disk_offset += 1;
}
}
// Prepare all the arguments for the call into `plan_reads` below
let (root_offset, _writer) = writer
.finish()
.expect("In memory disk finish should never fail");
let reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(0, root_offset, disk);
let planner = VectoredReadPlanner::new(100);
let mut reconstruct_state = ValuesReconstructState::new();
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
let keyspace = KeySpace {
ranges: vec![
base_key..base_key.add(3),
base_key.add(3)..base_key.add(100),
],
};
let lsn_range = Lsn(2)..Lsn(40);
// Plan and validate
let vectored_reads = DeltaLayerInner::plan_reads(
keyspace.clone(),
lsn_range.clone(),
disk_offset,
reader,
planner,
&mut reconstruct_state,
&ctx,
)
.await
.expect("Read planning should not fail");
validate(keyspace, lsn_range, vectored_reads, entries);
}
}

View File

@@ -43,6 +43,8 @@ use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::{Bytes, BytesMut};
use camino::{Utf8Path, Utf8PathBuf};
use futures::pin_mut;
use hex;
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::LayerAccessKind;
use pageserver_api::shard::TenantShardId;
@@ -54,6 +56,7 @@ use std::ops::Range;
use std::os::unix::prelude::FileExt;
use std::sync::Arc;
use tokio::sync::OnceCell;
use tokio_stream::StreamExt;
use tracing::*;
use utils::{
@@ -488,35 +491,33 @@ impl ImageLayerInner {
let tree_reader =
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
let btree_request_context = RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::DeltaLayerBtreeNode)
.build();
for range in keyspace.ranges.iter() {
let mut range_end_handled = false;
let mut search_key: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
range.start.write_to_byte_slice(&mut search_key);
tree_reader
.visit(
&search_key,
VisitDirection::Forwards,
|raw_key, offset| {
let key = Key::from_slice(&raw_key[..KEY_SIZE]);
assert!(key >= range.start);
let index_stream = tree_reader.get_stream_from(&search_key, &btree_request_context);
pin_mut!(index_stream);
if key >= range.end {
planner.handle_range_end(offset);
range_end_handled = true;
false
} else {
planner.handle(key, self.lsn, offset, BlobFlag::None);
true
}
},
&RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::ImageLayerBtreeNode)
.build(),
)
.await
.map_err(|err| GetVectoredError::Other(anyhow!(err)))?;
while let Some(index_entry) = index_stream.next().await {
let (raw_key, offset) = index_entry.map_err(|err| anyhow!(err))?;
let key = Key::from_slice(&raw_key[..KEY_SIZE]);
assert!(key >= range.start);
if key >= range.end {
planner.handle_range_end(offset);
range_end_handled = true;
break;
} else {
planner.handle(key, self.lsn, offset, BlobFlag::None);
}
}
if !range_end_handled {
let payload_end = self.index_start_blk as u64 * PAGE_SZ as u64;

View File

@@ -74,6 +74,8 @@ fn analyze_trace<R: std::io::Read>(mut reader: R) {
}
prev = Some(req);
}
PagestreamFeMessage::GetVectoredPages(_) => {}
PagestreamFeMessage::GetControlFileValues(_) => {}
PagestreamFeMessage::DbSize(_) => {}
};
}