Compare commits

...

7 Commits

Author SHA1 Message Date
Christian Schwarz
29f741e1e9 debounce: actually issue vectored get 2024-09-12 17:46:30 +00:00
Christian Schwarz
5d194c7824 debounce: bounce if shard or effective request_lsn differ 2024-09-12 14:20:32 +00:00
Christian Schwarz
ac2702afd3 deboucner: move decoding into debounce loop 2024-09-12 10:58:09 +00:00
Christian Schwarz
2d6763882e pagebench: fake queue depth of 10 2024-09-12 11:35:00 +01:00
Christian Schwarz
c0c23cde72 debouncer 2024-09-12 11:35:00 +01:00
Christian Schwarz
942bc9544b fixup 2024-09-11 20:04:39 +00:00
Christian Schwarz
02b7cdb305 HACK: instrument page_service to count nonblocking consecutive getpage requests 2024-09-11 19:25:19 +01:00
4 changed files with 521 additions and 158 deletions

View File

@@ -142,11 +142,16 @@ impl PagestreamClient {
) -> anyhow::Result<PagestreamGetPageResponse> {
let req = PagestreamFeMessage::GetPage(req);
let req: bytes::Bytes = req.serialize();
// let mut req = tokio_util::io::ReaderStream::new(&req);
let mut req = tokio_stream::once(Ok(req));
self.copy_both.send_all(&mut req).await?;
for i in 0..10 {
let mut req = tokio_stream::once(Ok(req.clone()));
self.copy_both.send_all(&mut req).await?;
}
for i in 0..9 {
let next: Option<Result<bytes::Bytes, _>> = self.copy_both.next().await;
let next: bytes::Bytes = next.unwrap()?;
}
let next: Option<Result<bytes::Bytes, _>> = self.copy_both.next().await;
let next: bytes::Bytes = next.unwrap()?;

View File

@@ -1185,6 +1185,7 @@ struct GlobalAndPerTimelineHistogramTimer<'a, 'c> {
ctx: &'c RequestContext,
start: std::time::Instant,
op: SmgrQueryType,
count: usize,
}
impl<'a, 'c> Drop for GlobalAndPerTimelineHistogramTimer<'a, 'c> {
@@ -1212,9 +1213,11 @@ impl<'a, 'c> Drop for GlobalAndPerTimelineHistogramTimer<'a, 'c> {
elapsed
}
};
self.global_metric.observe(ex_throttled.as_secs_f64());
if let Some(timeline_metric) = self.timeline_metric {
timeline_metric.observe(ex_throttled.as_secs_f64());
for _ in 0..self.count {
self.global_metric.observe(ex_throttled.as_secs_f64());
if let Some(timeline_metric) = self.timeline_metric {
timeline_metric.observe(ex_throttled.as_secs_f64());
}
}
}
}
@@ -1343,6 +1346,14 @@ impl SmgrQueryTimePerTimeline {
&'a self,
op: SmgrQueryType,
ctx: &'c RequestContext,
) -> Option<impl Drop + '_> {
self.start_timer_many(op, 1, ctx)
}
pub(crate) fn start_timer_many<'c: 'a, 'a>(
&'a self,
op: SmgrQueryType,
count: usize,
ctx: &'c RequestContext,
) -> Option<impl Drop + '_> {
let global_metric = &self.global_metrics[op as usize];
let start = Instant::now();
@@ -1376,6 +1387,7 @@ impl SmgrQueryTimePerTimeline {
ctx,
start,
op,
count,
})
}
}
@@ -3170,6 +3182,16 @@ static TOKIO_EXECUTOR_THREAD_COUNT: Lazy<UIntGaugeVec> = Lazy::new(|| {
.unwrap()
});
pub(crate) static CONSECUTIVE_NONBLOCKING_GETPAGE_REQUESTS_HISTOGRAM: Lazy<Histogram> =
Lazy::new(|| {
register_histogram!(
"pageserver_consecutive_nonblocking_getpage_requests",
"Number of consecutive nonblocking getpage requests",
(0..=256).map(|x| x as f64).collect::<Vec<f64>>(),
)
.unwrap()
});
pub(crate) fn set_tokio_runtime_setup(setup: &str, num_threads: NonZeroUsize) {
static SERIALIZE: std::sync::Mutex<()> = std::sync::Mutex::new(());
let _guard = SERIALIZE.lock().unwrap();

View File

@@ -5,14 +5,14 @@ use anyhow::Context;
use async_compression::tokio::write::GzipEncoder;
use bytes::Buf;
use futures::FutureExt;
use once_cell::sync::OnceCell;
use pageserver_api::models::TenantState;
use once_cell::sync::{Lazy, OnceCell};
use pageserver_api::models::{self, TenantState};
use pageserver_api::models::{
PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse,
PagestreamGetSlruSegmentRequest, PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest,
PagestreamNblocksResponse, PagestreamProtocolVersion,
PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetSlruSegmentRequest,
PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest, PagestreamNblocksResponse,
PagestreamProtocolVersion,
};
use pageserver_api::shard::TenantShardId;
use postgres_backend::{is_expected_io_error, AuthType, PostgresBackend, QueryError};
@@ -43,7 +43,7 @@ use crate::basebackup;
use crate::basebackup::BasebackupError;
use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
use crate::metrics;
use crate::metrics::{self, CONSECUTIVE_NONBLOCKING_GETPAGE_REQUESTS_HISTOGRAM};
use crate::metrics::{ComputeCommandKind, COMPUTE_COMMANDS_COUNTERS, LIVE_CONNECTIONS};
use crate::pgdatadir_mapping::Version;
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
@@ -58,7 +58,7 @@ use crate::tenant::GetTimelineError;
use crate::tenant::PageReconstructError;
use crate::tenant::Timeline;
use pageserver_api::key::rel_block_to_key;
use pageserver_api::reltag::SlruKind;
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
use postgres_ffi::BLCKSZ;
@@ -577,124 +577,317 @@ impl PageServerHandler {
}
}
loop {
// read request bytes (it's exactly 1 PagestreamFeMessage per CopyData)
let msg = tokio::select! {
biased;
_ = self.cancel.cancelled() => {
return Err(QueryError::Shutdown)
}
msg = pgb.read_message() => { msg }
};
let copy_data_bytes = match msg? {
Some(FeMessage::CopyData(bytes)) => bytes,
Some(FeMessage::Terminate) => break,
Some(m) => {
return Err(QueryError::Other(anyhow::anyhow!(
"unexpected message: {m:?} during COPY"
)));
}
None => break, // client disconnected
};
let mut batched = None;
'outer: loop {
enum DebouncedFeMessage {
Exists(models::PagestreamExistsRequest),
Nblocks(models::PagestreamNblocksRequest),
GetPage {
span: Span,
shard: timeline::handle::Handle<TenantManagerTypes>,
effective_request_lsn: Lsn,
pages: smallvec::SmallVec<[(RelTag, BlockNumber); 1]>,
},
DbSize(models::PagestreamDbSizeRequest),
GetSlruSegment(models::PagestreamGetSlruSegmentRequest),
RespondError(Span, PageStreamError),
}
let mut debounce: Option<std::time::Instant> = None;
// return or `?` on protocol error
// `break EXPR` to stop batching. The EXPR will be the first message in the next batch.
let next_batched: Option<DebouncedFeMessage> = loop {
static BOUNCE_TIMEOUT: Lazy<Duration> = Lazy::new(|| {
utils::env::var::<humantime::Duration, _>("NEON_PAGESERVER_DEBOUNCE")
.unwrap()
.into()
});
let sleep_fut = if let Some(started_at) = debounce {
futures::future::Either::Left(tokio::time::sleep_until(
(started_at + *BOUNCE_TIMEOUT).into(),
))
} else {
futures::future::Either::Right(futures::future::pending())
};
let msg = tokio::select! {
biased;
_ = self.cancel.cancelled() => {
return Err(QueryError::Shutdown)
}
msg = pgb.read_message() => {
msg
}
_ = sleep_fut => {
assert!(batched.is_some());
break None;
}
};
let copy_data_bytes = match msg? {
Some(FeMessage::CopyData(bytes)) => bytes,
Some(FeMessage::Terminate) => break 'outer,
Some(m) => {
return Err(QueryError::Other(anyhow::anyhow!(
"unexpected message: {m:?} during COPY"
)));
}
None => break 'outer, // client disconnected
};
trace!("query: {copy_data_bytes:?}");
fail::fail_point!("ps::handle-pagerequest-message");
trace!("query: {copy_data_bytes:?}");
fail::fail_point!("ps::handle-pagerequest-message");
// parse request
let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
// parse request
let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
let this_msg = match neon_fe_msg {
PagestreamFeMessage::Exists(msg) => DebouncedFeMessage::Exists(msg),
PagestreamFeMessage::Nblocks(msg) => DebouncedFeMessage::Nblocks(msg),
PagestreamFeMessage::DbSize(msg) => DebouncedFeMessage::DbSize(msg),
PagestreamFeMessage::GetSlruSegment(msg) => {
DebouncedFeMessage::GetSlruSegment(msg)
}
PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
request_lsn,
not_modified_since,
rel,
blkno,
}) => {
let span = tracing::info_span!("handle_get_page_at_lsn_request_batched", %tenant_id, %timeline_id, shard_id = tracing::field::Empty, req_lsn = %request_lsn, batch_size = tracing::field::Empty);
let key = rel_block_to_key(rel, blkno);
let shard = match self
.timeline_handles
.get(tenant_id, timeline_id, ShardSelector::Page(key))
.instrument(span.clone())
.await
{
Ok(tl) => tl,
Err(GetActiveTimelineError::Tenant(
GetActiveTenantError::NotFound(_),
)) => {
// We already know this tenant exists in general, because we resolved it at
// start of connection. Getting a NotFound here indicates that the shard containing
// the requested page is not present on this node: the client's knowledge of shard->pageserver
// mapping is out of date.
//
// Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
// client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
// and talk to a different pageserver.
break Some(DebouncedFeMessage::RespondError(
span,
PageStreamError::Reconnect(
"getpage@lsn request routed to wrong shard".into(),
),
));
}
Err(e) => break Some(DebouncedFeMessage::RespondError(span, e.into())),
};
let effective_request_lsn = match Self::wait_or_get_last_lsn(
&shard,
request_lsn,
not_modified_since,
&shard.get_latest_gc_cutoff_lsn(),
&ctx,
)
// TODO: if we actually need to wait for lsn here, it delays the entire batch which doesn't need to wait
.await
{
Ok(lsn) => lsn,
Err(e) => {
break Some(DebouncedFeMessage::RespondError(span, e));
}
};
DebouncedFeMessage::GetPage {
span,
shard,
effective_request_lsn,
pages: smallvec::smallvec![(rel, blkno)],
}
}
};
// check if we can debounce
match (&mut batched, this_msg) {
(None, this_msg) => {
batched = Some(this_msg);
}
(
Some(DebouncedFeMessage::GetPage {
span: _,
shard: accum_shard,
pages: accum_pages,
effective_request_lsn: accum_lsn,
}),
DebouncedFeMessage::GetPage {
span: _,
shard: this_shard,
pages: this_pages,
effective_request_lsn: this_lsn,
},
) if async {
assert_eq!(this_pages.len(), 1);
if accum_pages.len() >= Timeline::MAX_GET_VECTORED_KEYS as usize {
assert_eq!(accum_pages.len(), Timeline::MAX_GET_VECTORED_KEYS as usize);
return false;
}
if (accum_shard.tenant_shard_id, accum_shard.timeline_id)
!= (this_shard.tenant_shard_id, this_shard.timeline_id)
{
// TODO: we _could_ batch & execute each shard seperately (and in parallel).
// But the current logig for keeping responses in order does not support that.
return false;
}
// the vectored get currently only supports a single LSN, so, bounce as soon
// as the effective request_lsn changes
return *accum_lsn == this_lsn;
}
.await =>
{
// ok to batch
accum_pages.extend(this_pages);
}
(Some(_), this_msg) => {
// by default, don't continue batching
break Some(this_msg);
}
}
// debounce impl piece
let started_at = debounce.get_or_insert_with(Instant::now);
if started_at.elapsed() > *BOUNCE_TIMEOUT {
break None;
}
};
// invoke handler function
let (handler_result, span) = match neon_fe_msg {
PagestreamFeMessage::Exists(req) => {
let (handler_results, span): (
smallvec::SmallVec<[Result<PagestreamBeMessage, PageStreamError>; 1]>,
_,
) = match batched.take().expect("loop above ensures this") {
DebouncedFeMessage::Exists(req) => {
fail::fail_point!("ps::handle-pagerequest-message::exists");
let span = tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.request_lsn);
(
self.handle_get_rel_exists_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
.await,
smallvec::smallvec![
self.handle_get_rel_exists_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
.await
],
span,
)
}
PagestreamFeMessage::Nblocks(req) => {
DebouncedFeMessage::Nblocks(req) => {
fail::fail_point!("ps::handle-pagerequest-message::nblocks");
let span = tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.request_lsn);
(
self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
.await,
smallvec::smallvec![
self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
.await,
],
span,
)
}
PagestreamFeMessage::GetPage(req) => {
DebouncedFeMessage::GetPage {
span,
shard,
effective_request_lsn,
pages,
} => {
CONSECUTIVE_NONBLOCKING_GETPAGE_REQUESTS_HISTOGRAM.observe(pages.len() as f64);
span.record("batch_size", pages.len() as u64);
fail::fail_point!("ps::handle-pagerequest-message::getpage");
// shard_id is filled in by the handler
let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.request_lsn);
(
self.handle_get_page_at_lsn_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
.await,
{
let npages = pages.len();
let res = self
.handle_get_page_at_lsn_request_batched(
&shard,
effective_request_lsn,
pages,
&ctx,
)
.instrument(span.clone())
.await;
assert_eq!(res.len(), npages);
res
},
span,
)
}
PagestreamFeMessage::DbSize(req) => {
DebouncedFeMessage::DbSize(req) => {
fail::fail_point!("ps::handle-pagerequest-message::dbsize");
let span = tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.request_lsn);
(
self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
.await,
smallvec::smallvec![
self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
.await
],
span,
)
}
PagestreamFeMessage::GetSlruSegment(req) => {
DebouncedFeMessage::GetSlruSegment(req) => {
fail::fail_point!("ps::handle-pagerequest-message::slrusegment");
let span = tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.request_lsn);
(
self.handle_get_slru_segment_request(tenant_id, timeline_id, &req, &ctx)
smallvec::smallvec![
self.handle_get_slru_segment_request(
tenant_id,
timeline_id,
&req,
&ctx
)
.instrument(span.clone())
.await,
.await
],
span,
)
}
DebouncedFeMessage::RespondError(span, e) => {
// We've already decided to respond with an error, so we don't need to
// call the handler.
(smallvec::smallvec![Err(e)], span)
}
};
// Map handler result to protocol behavior.
// Some handler errors cause exit from pagestream protocol.
// Other handler errors are sent back as an error message and we stay in pagestream protocol.
let response_msg = match handler_result {
Err(e) => match &e {
PageStreamError::Shutdown => {
// If we fail to fulfil a request during shutdown, which may be _because_ of
// shutdown, then do not send the error to the client. Instead just drop the
// connection.
span.in_scope(|| info!("dropping connection due to shutdown"));
return Err(QueryError::Shutdown);
}
PageStreamError::Reconnect(reason) => {
span.in_scope(|| info!("handler requested reconnect: {reason}"));
return Err(QueryError::Reconnect);
}
PageStreamError::Read(_)
| PageStreamError::LsnTimeout(_)
| PageStreamError::NotFound(_)
| PageStreamError::BadRequest(_) => {
// print the all details to the log with {:#}, but for the client the
// error message is enough. Do not log if shutting down, as the anyhow::Error
// here includes cancellation which is not an error.
let full = utils::error::report_compact_sources(&e);
span.in_scope(|| {
error!("error reading relation or page version: {full:#}")
});
PagestreamBeMessage::Error(PagestreamErrorResponse {
message: e.to_string(),
})
}
},
Ok(response_msg) => response_msg,
};
for handler_result in handler_results {
let response_msg = match handler_result {
Err(e) => match &e {
PageStreamError::Shutdown => {
// If we fail to fulfil a request during shutdown, which may be _because_ of
// shutdown, then do not send the error to the client. Instead just drop the
// connection.
span.in_scope(|| info!("dropping connection due to shutdown"));
return Err(QueryError::Shutdown);
}
PageStreamError::Reconnect(reason) => {
span.in_scope(|| info!("handler requested reconnect: {reason}"));
return Err(QueryError::Reconnect);
}
PageStreamError::Read(_)
| PageStreamError::LsnTimeout(_)
| PageStreamError::NotFound(_)
| PageStreamError::BadRequest(_) => {
// print the all details to the log with {:#}, but for the client the
// error message is enough. Do not log if shutting down, as the anyhow::Error
// here includes cancellation which is not an error.
let full = utils::error::report_compact_sources(&e);
span.in_scope(|| {
error!("error reading relation or page version: {full:#}")
});
PagestreamBeMessage::Error(PagestreamErrorResponse {
message: e.to_string(),
})
}
},
Ok(response_msg) => response_msg,
};
// marshal & transmit response message
pgb.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?;
// marshal & transmit response message
pgb.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?;
}
tokio::select! {
biased;
_ = self.cancel.cancelled() => {
@@ -706,6 +899,9 @@ impl PageServerHandler {
res?;
}
}
assert!(batched.is_none(), "we take() earlier");
batched = next_batched;
}
Ok(())
}
@@ -949,60 +1145,30 @@ impl PageServerHandler {
}))
}
#[instrument(skip_all, fields(shard_id))]
async fn handle_get_page_at_lsn_request(
#[instrument(skip_all)]
async fn handle_get_page_at_lsn_request_batched(
&mut self,
tenant_id: TenantId,
timeline_id: TimelineId,
req: &PagestreamGetPageRequest,
timeline: &Timeline,
effective_lsn: Lsn,
pages: smallvec::SmallVec<[(RelTag, BlockNumber); 1]>,
ctx: &RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
let timeline = match self
.timeline_handles
.get(
tenant_id,
timeline_id,
ShardSelector::Page(rel_block_to_key(req.rel, req.blkno)),
)
.await
{
Ok(tl) => tl,
Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => {
// We already know this tenant exists in general, because we resolved it at
// start of connection. Getting a NotFound here indicates that the shard containing
// the requested page is not present on this node: the client's knowledge of shard->pageserver
// mapping is out of date.
//
// Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
// client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
// and talk to a different pageserver.
return Err(PageStreamError::Reconnect(
"getpage@lsn request routed to wrong shard".into(),
));
}
Err(e) => return Err(e.into()),
};
let _timer = timeline
.query_metrics
.start_timer(metrics::SmgrQueryType::GetPageAtLsn, ctx);
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn(
&timeline,
req.request_lsn,
req.not_modified_since,
&latest_gc_cutoff_lsn,
) -> smallvec::SmallVec<[Result<PagestreamBeMessage, PageStreamError>; 1]> {
debug_assert_current_span_has_tenant_and_timeline_id();
let _timer = timeline.query_metrics.start_timer_many(
metrics::SmgrQueryType::GetPageAtLsn,
pages.len(),
ctx,
)
.await?;
);
let page = timeline
.get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), ctx)
.await?;
let pages = timeline
.get_rel_page_at_lsn_batched(pages, Version::Lsn(effective_lsn), ctx)
.await;
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
page,
smallvec::SmallVec::from_iter(pages.into_iter().map(|page| {
page.map(|page| {
PagestreamBeMessage::GetPage(models::PagestreamGetPageResponse { page })
})
.map_err(PageStreamError::Read)
}))
}
@@ -1499,3 +1665,10 @@ fn set_tracing_field_shard_id(timeline: &Timeline) {
);
debug_assert_current_span_has_tenant_and_timeline_id();
}
struct WaitedForLsn(Lsn);
impl From<WaitedForLsn> for Lsn {
fn from(WaitedForLsn(lsn): WaitedForLsn) -> Self {
lsn
}
}

View File

@@ -9,12 +9,17 @@
use super::tenant::{PageReconstructError, Timeline};
use crate::context::RequestContext;
use crate::keyspace::{KeySpace, KeySpaceAccum};
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
use crate::span::{
debug_assert_current_span_has_tenant_and_timeline_id,
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
};
use crate::tenant::timeline::GetVectoredError;
use crate::walrecord::NeonWalRecord;
use crate::{aux_file, repository::*};
use anyhow::{ensure, Context};
use bytes::{Buf, Bytes, BytesMut};
use enum_map::Enum;
use itertools::Itertools;
use pageserver_api::key::{
dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range, rel_size_to_key,
relmap_file_key, repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
@@ -28,7 +33,7 @@ use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi::BLCKSZ;
use postgres_ffi::{Oid, RepOriginId, TimestampTz, TransactionId};
use serde::{Deserialize, Serialize};
use std::collections::{hash_map, HashMap, HashSet};
use std::collections::{hash_map, BTreeMap, HashMap, HashSet};
use std::ops::ControlFlow;
use std::ops::Range;
use strum::IntoEnumIterator;
@@ -191,26 +196,184 @@ impl Timeline {
version: Version<'_>,
ctx: &RequestContext,
) -> Result<Bytes, PageReconstructError> {
if tag.relnode == 0 {
return Err(PageReconstructError::Other(
RelationError::InvalidRelnode.into(),
));
}
let pages = smallvec::smallvec![(tag, blknum)];
let res = self.get_rel_page_at_lsn_batched(pages, version, ctx).await;
assert_eq!(res.len(), 1);
res.into_iter().next().unwrap()
}
let nblocks = self.get_rel_size(tag, version, ctx).await?;
if blknum >= nblocks {
debug!(
"read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
tag,
blknum,
version.get_lsn(),
nblocks
);
return Ok(ZERO_PAGE.clone());
/// Like [`get_rel_page_at_lsn`], but returns a batch of pages.
pub(crate) async fn get_rel_page_at_lsn_batched(
&self,
pages: smallvec::SmallVec<[(RelTag, BlockNumber); 1]>,
version: Version<'_>,
ctx: &RequestContext,
) -> smallvec::SmallVec<[Result<Bytes, PageReconstructError>; 1]> {
debug_assert_current_span_has_tenant_and_timeline_id();
let request_lsn = match version {
Version::Lsn(lsn) => lsn,
Version::Modified(_) => panic!("unsupported"),
};
enum KeyState {
NeedsVectoredGet,
Done(Result<Bytes, PageReconstructError>),
}
let mut key_states = BTreeMap::new();
let mut vectored_gets: smallvec::SmallVec<[_; 1]> =
smallvec::SmallVec::with_capacity(pages.len());
for (response_order, (tag, blknum)) in pages.into_iter().enumerate() {
let key = rel_block_to_key(tag, blknum);
use std::collections::btree_map::Entry;
let key_state_slot = match key_states.entry((key, response_order)) {
Entry::Occupied(_entry) => unreachable!(
"enumerate makes keys unique, even if batch contains same key twice"
),
Entry::Vacant(entry) => entry,
};
let key = rel_block_to_key(tag, blknum);
version.get(self, key, ctx).await
if tag.relnode == 0 {
key_state_slot.insert(KeyState::Done(Err(PageReconstructError::Other(
RelationError::InvalidRelnode.into(),
))));
continue;
}
let nblocks = match self.get_rel_size(tag, version, ctx).await {
Ok(nblocks) => nblocks,
Err(err) => {
key_state_slot.insert(KeyState::Done(Err(err)));
continue;
}
};
if blknum >= nblocks {
debug!(
"read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
tag,
blknum,
version.get_lsn(),
nblocks
);
key_state_slot.insert(KeyState::Done(Ok(ZERO_PAGE.clone())));
continue;
}
vectored_gets.push(key);
key_state_slot.insert(KeyState::NeedsVectoredGet);
}
// turn vectored_gets into a keyspace
let keyspace = {
// add_key reuqires monotonicity
vectored_gets.sort_unstable();
let mut acc = KeySpaceAccum::new();
for key in vectored_gets
.into_iter()
// in fact it requires strong monotonicity
.dedup()
{
acc.add_key(key);
}
acc.to_keyspace()
};
match self.get_vectored(keyspace, request_lsn, ctx).await {
Ok(results) => {
for (key, res) in results {
if let Err(err) = &res {
warn!(%key, ?err, "a key inside get_vectored failed with a per-key error");
}
let mut interests = key_states.range_mut((key, 0)..(key.next(), 0)).peekable();
let first_interest = interests.next().unwrap();
let next_interest = interests.peek().is_some();
if !next_interest {
match first_interest.1 {
KeyState::NeedsVectoredGet => {
*first_interest.1 = KeyState::Done(res);
}
KeyState::Done(_) => unreachable!(),
}
continue;
} else {
for ((_, _), state) in [first_interest].into_iter().chain(interests) {
match state {
KeyState::NeedsVectoredGet => {
*state = KeyState::Done(match &res {
Ok(buf) => Ok(buf.clone()),
// this `match` is working around the fact that we cannot Clone the PageReconstructError
Err(err) => Err(match err {
PageReconstructError::Cancelled => {
PageReconstructError::Cancelled
}
x @ PageReconstructError::Other(_) |
x @ PageReconstructError::AncestorLsnTimeout(_) |
x @ PageReconstructError::WalRedo(_) |
x @ PageReconstructError::MissingKey(_) => {
PageReconstructError::Other(anyhow::anyhow!("there was more than one request for this key in the batch, error logged once: {x:?}"))
},
}),
});
}
KeyState::Done(_) => unreachable!(),
}
}
}
}
}
Err(err) => {
warn!(?err, "get_vectored failed with a global error, mapping that error to per-key failure");
// this cannot really happen because get_vectored only errors globally on invalid LSN or too large batch size
for ((_, _), state) in key_states.iter_mut() {
// this whole `match` is a lot like `From<GetVectoredError> for PageReconstructError`
// but without taking ownership of the GetVectoredError
match &err {
GetVectoredError::Cancelled => {
*state = KeyState::Done(Err(PageReconstructError::Cancelled));
}
// TODO: restructure get_vectored API to make this error per-key
GetVectoredError::MissingKey(err) => {
*state = KeyState::Done(Err(PageReconstructError::Other(anyhow::anyhow!("whole vectored get request failed because one or more of the requested keys were missing: {err:?}"))));
}
// TODO: restructure get_vectored API to make this error per-key
GetVectoredError::GetReadyAncestorError(err) => {
*state = KeyState::Done(Err(PageReconstructError::Other(anyhow::anyhow!("whole vectored get request failed because one or more key required ancestor that wasn't ready: {err:?}"))));
}
// TODO: restructure get_vectored API to make this error per-key
GetVectoredError::Other(err) => {
*state = KeyState::Done(Err(PageReconstructError::Other(
anyhow::anyhow!("whole vectored get request failed: {err:?}"),
)));
}
// TODO: we can prevent this error class by moving this check into the type system
GetVectoredError::InvalidLsn(e) => {
*state =
KeyState::Done(Err(anyhow::anyhow!("invalid LSN: {e:?}").into()));
}
// NB: this should never happen in practice because we limit MAX_GET_VECTORED_KEYS
// TODO: we can prevent this error class by moving this check into the type system
GetVectoredError::Oversized(err) => {
*state = KeyState::Done(Err(anyhow::anyhow!(
"batching oversized: {err:?}"
)
.into()));
}
}
}
}
};
// get the results into the order in which they were requested
let mut return_order: smallvec::SmallVec<[_; Timeline::MAX_GET_VECTORED_KEYS as usize]> =
smallvec::SmallVec::with_capacity(key_states.len());
return_order.extend(key_states.keys().map(|(key, idx)| (*key, *idx)));
return_order.sort_unstable_by_key(|(_, idx)| *idx);
let mut res = smallvec::SmallVec::with_capacity(key_states.len());
res.extend(return_order.into_iter().map(|key_states_key| {
match key_states.remove(&key_states_key).unwrap() {
KeyState::Done(res) => res,
KeyState::NeedsVectoredGet => unreachable!(),
}
}));
res
}
// Get size of a database in blocks