feat(page_service): timeout-based batching of requests (#9321)

## Problem

We don't take advantage of queue depth generated by the compute
on the pageserver. We can process getpage requests more efficiently
by batching them. 

## Summary of changes

Batch up incoming getpage requests that arrive within a configurable
time window (`server_side_batch_timeout`).
Then process the entire batch via one `get_vectored` timeline operation.
By default, no merging takes place.

## Testing

* **Functional**: https://github.com/neondatabase/neon/pull/9792
* **Performance**: will be done in staging/pre-prod

# Refs

* https://github.com/neondatabase/neon/issues/9377
* https://github.com/neondatabase/neon/issues/9376

Co-authored-by: Christian Schwarz <christian@neon.tech>
This commit is contained in:
Vlad Lazar
2024-11-18 20:24:03 +00:00
committed by GitHub
parent e5c89f3da3
commit d7662fdc7b
9 changed files with 706 additions and 196 deletions

1
Cargo.lock generated
View File

@@ -3642,6 +3642,7 @@ dependencies = [
"serde_json",
"serde_path_to_error",
"serde_with",
"smallvec",
"storage_broker",
"strum",
"strum_macros",

View File

@@ -109,6 +109,8 @@ pub struct ConfigToml {
pub virtual_file_io_mode: Option<crate::models::virtual_file::IoMode>,
#[serde(skip_serializing_if = "Option::is_none")]
pub no_sync: Option<bool>,
#[serde(with = "humantime_serde")]
pub server_side_batch_timeout: Option<Duration>,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
@@ -317,6 +319,8 @@ pub mod defaults {
pub const DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB: usize = 0;
pub const DEFAULT_IO_BUFFER_ALIGNMENT: usize = 512;
pub const DEFAULT_SERVER_SIDE_BATCH_TIMEOUT: Option<&str> = None;
}
impl Default for ConfigToml {
@@ -397,6 +401,8 @@ impl Default for ConfigToml {
ephemeral_bytes_per_memory_kb: (DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB),
l0_flush: None,
virtual_file_io_mode: None,
server_side_batch_timeout: DEFAULT_SERVER_SIDE_BATCH_TIMEOUT
.map(|duration| humantime::parse_duration(duration).unwrap()),
tenant_config: TenantConfigToml::default(),
no_sync: None,
}

View File

@@ -716,6 +716,9 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
Ok(())
}
// Proto looks like this:
// FeMessage::Query("pagestream_v2{FeMessage::CopyData(PagesetreamFeMessage::GetPage(..))}")
async fn process_message(
&mut self,
handler: &mut impl Handler<IO>,

View File

@@ -84,6 +84,7 @@ enumset = { workspace = true, features = ["serde"]}
strum.workspace = true
strum_macros.workspace = true
wal_decoder.workspace = true
smallvec.workspace = true
[target.'cfg(target_os = "linux")'.dependencies]
procfs.workspace = true

View File

@@ -182,6 +182,10 @@ pub struct PageServerConf {
/// Optionally disable disk syncs (unsafe!)
pub no_sync: bool,
/// Maximum amount of time for which a get page request request
/// might be held up for request merging.
pub server_side_batch_timeout: Option<Duration>,
}
/// Token for authentication to safekeepers
@@ -336,6 +340,7 @@ impl PageServerConf {
concurrent_tenant_warmup,
concurrent_tenant_size_logical_size_queries,
virtual_file_io_engine,
server_side_batch_timeout,
tenant_config,
no_sync,
} = config_toml;
@@ -377,6 +382,7 @@ impl PageServerConf {
image_compression,
timeline_offloading,
ephemeral_bytes_per_memory_kb,
server_side_batch_timeout,
// ------------------------------------------------------------
// fields that require additional validation or custom handling

View File

@@ -1187,6 +1187,7 @@ struct GlobalAndPerTimelineHistogramTimer<'a, 'c> {
ctx: &'c RequestContext,
start: std::time::Instant,
op: SmgrQueryType,
count: usize,
}
impl Drop for GlobalAndPerTimelineHistogramTimer<'_, '_> {
@@ -1214,10 +1215,13 @@ impl Drop for GlobalAndPerTimelineHistogramTimer<'_, '_> {
elapsed
}
};
self.global_latency_histo
.observe(ex_throttled.as_secs_f64());
if let Some(per_timeline_getpage_histo) = self.per_timeline_latency_histo {
per_timeline_getpage_histo.observe(ex_throttled.as_secs_f64());
for _ in 0..self.count {
self.global_latency_histo
.observe(ex_throttled.as_secs_f64());
if let Some(per_timeline_getpage_histo) = self.per_timeline_latency_histo {
per_timeline_getpage_histo.observe(ex_throttled.as_secs_f64());
}
}
}
}
@@ -1385,6 +1389,14 @@ impl SmgrQueryTimePerTimeline {
&'a self,
op: SmgrQueryType,
ctx: &'c RequestContext,
) -> Option<impl Drop + 'a> {
self.start_timer_many(op, 1, ctx)
}
pub(crate) fn start_timer_many<'c: 'a, 'a>(
&'a self,
op: SmgrQueryType,
count: usize,
ctx: &'c RequestContext,
) -> Option<impl Drop + 'a> {
let start = Instant::now();
@@ -1422,6 +1434,7 @@ impl SmgrQueryTimePerTimeline {
ctx,
start,
op,
count,
})
}
}

View File

@@ -7,13 +7,13 @@ use bytes::Buf;
use futures::FutureExt;
use itertools::Itertools;
use once_cell::sync::OnceCell;
use pageserver_api::models::TenantState;
use pageserver_api::models::{self, TenantState};
use pageserver_api::models::{
PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse,
PagestreamGetSlruSegmentRequest, PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest,
PagestreamNblocksResponse, PagestreamProtocolVersion,
PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetSlruSegmentRequest,
PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest, PagestreamNblocksResponse,
PagestreamProtocolVersion,
};
use pageserver_api::shard::TenantShardId;
use postgres_backend::{is_expected_io_error, AuthType, PostgresBackend, QueryError};
@@ -44,7 +44,7 @@ use crate::basebackup;
use crate::basebackup::BasebackupError;
use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
use crate::metrics;
use crate::metrics::{self};
use crate::metrics::{ComputeCommandKind, COMPUTE_COMMANDS_COUNTERS, LIVE_CONNECTIONS};
use crate::pgdatadir_mapping::Version;
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
@@ -59,7 +59,7 @@ use crate::tenant::GetTimelineError;
use crate::tenant::PageReconstructError;
use crate::tenant::Timeline;
use pageserver_api::key::rel_block_to_key;
use pageserver_api::reltag::SlruKind;
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
use postgres_ffi::BLCKSZ;
@@ -105,6 +105,7 @@ pub fn spawn(
pg_auth,
tcp_listener,
conf.pg_auth_type,
conf.server_side_batch_timeout,
libpq_ctx,
cancel.clone(),
)
@@ -153,6 +154,7 @@ pub async fn libpq_listener_main(
auth: Option<Arc<SwappableJwtAuth>>,
listener: tokio::net::TcpListener,
auth_type: AuthType,
server_side_batch_timeout: Option<Duration>,
listener_ctx: RequestContext,
listener_cancel: CancellationToken,
) -> Connections {
@@ -183,6 +185,7 @@ pub async fn libpq_listener_main(
local_auth,
socket,
auth_type,
server_side_batch_timeout,
connection_ctx,
connections_cancel.child_token(),
));
@@ -210,6 +213,7 @@ async fn page_service_conn_main(
auth: Option<Arc<SwappableJwtAuth>>,
socket: tokio::net::TcpStream,
auth_type: AuthType,
server_side_batch_timeout: Option<Duration>,
connection_ctx: RequestContext,
cancel: CancellationToken,
) -> ConnectionHandlerResult {
@@ -260,8 +264,13 @@ async fn page_service_conn_main(
// and create a child per-query context when it invokes process_query.
// But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
// and create the per-query context in process_query ourselves.
let mut conn_handler =
PageServerHandler::new(tenant_manager, auth, connection_ctx, cancel.clone());
let mut conn_handler = PageServerHandler::new(
tenant_manager,
auth,
server_side_batch_timeout,
connection_ctx,
cancel.clone(),
);
let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
match pgbackend.run(&mut conn_handler, &cancel).await {
@@ -304,6 +313,12 @@ struct PageServerHandler {
cancel: CancellationToken,
timeline_handles: TimelineHandles,
/// Messages queued up for the next processing batch
next_batch: Option<BatchedFeMessage>,
/// See [`PageServerConf::server_side_batch_timeout`]
server_side_batch_timeout: Option<Duration>,
}
struct TimelineHandles {
@@ -517,10 +532,47 @@ impl From<WaitLsnError> for QueryError {
}
}
enum BatchedFeMessage {
Exists {
span: Span,
req: models::PagestreamExistsRequest,
},
Nblocks {
span: Span,
req: models::PagestreamNblocksRequest,
},
GetPage {
span: Span,
shard: timeline::handle::Handle<TenantManagerTypes>,
effective_request_lsn: Lsn,
pages: smallvec::SmallVec<[(RelTag, BlockNumber); 1]>,
},
DbSize {
span: Span,
req: models::PagestreamDbSizeRequest,
},
GetSlruSegment {
span: Span,
req: models::PagestreamGetSlruSegmentRequest,
},
RespondError {
span: Span,
error: PageStreamError,
},
}
enum BatchOrEof {
/// In the common case, this has one entry.
/// At most, it has two entries: the first is the leftover batch, the second is an error.
Batch(smallvec::SmallVec<[BatchedFeMessage; 1]>),
Eof,
}
impl PageServerHandler {
pub fn new(
tenant_manager: Arc<TenantManager>,
auth: Option<Arc<SwappableJwtAuth>>,
server_side_batch_timeout: Option<Duration>,
connection_ctx: RequestContext,
cancel: CancellationToken,
) -> Self {
@@ -530,6 +582,8 @@ impl PageServerHandler {
connection_ctx,
timeline_handles: TimelineHandles::new(tenant_manager),
cancel,
next_batch: None,
server_side_batch_timeout,
}
}
@@ -557,6 +611,221 @@ impl PageServerHandler {
)
}
async fn read_batch_from_connection<IO>(
&mut self,
pgb: &mut PostgresBackend<IO>,
tenant_id: &TenantId,
timeline_id: &TimelineId,
ctx: &RequestContext,
) -> Result<Option<BatchOrEof>, QueryError>
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
let mut batch = self.next_batch.take();
let mut batch_started_at: Option<std::time::Instant> = None;
let next_batch: Option<BatchedFeMessage> = loop {
let sleep_fut = match (self.server_side_batch_timeout, batch_started_at) {
(Some(batch_timeout), Some(started_at)) => futures::future::Either::Left(
tokio::time::sleep_until((started_at + batch_timeout).into()),
),
_ => futures::future::Either::Right(futures::future::pending()),
};
let msg = tokio::select! {
biased;
_ = self.cancel.cancelled() => {
return Err(QueryError::Shutdown)
}
msg = pgb.read_message() => {
msg
}
_ = sleep_fut => {
assert!(batch.is_some());
break None;
}
};
let copy_data_bytes = match msg? {
Some(FeMessage::CopyData(bytes)) => bytes,
Some(FeMessage::Terminate) => {
return Ok(Some(BatchOrEof::Eof));
}
Some(m) => {
return Err(QueryError::Other(anyhow::anyhow!(
"unexpected message: {m:?} during COPY"
)));
}
None => {
return Ok(Some(BatchOrEof::Eof));
} // client disconnected
};
trace!("query: {copy_data_bytes:?}");
fail::fail_point!("ps::handle-pagerequest-message");
// parse request
let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
let this_msg = match neon_fe_msg {
PagestreamFeMessage::Exists(req) => BatchedFeMessage::Exists {
span: tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.request_lsn),
req,
},
PagestreamFeMessage::Nblocks(req) => BatchedFeMessage::Nblocks {
span: tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.request_lsn),
req,
},
PagestreamFeMessage::DbSize(req) => BatchedFeMessage::DbSize {
span: tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.request_lsn),
req,
},
PagestreamFeMessage::GetSlruSegment(req) => BatchedFeMessage::GetSlruSegment {
span: tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.request_lsn),
req,
},
PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
request_lsn,
not_modified_since,
rel,
blkno,
}) => {
// shard_id is filled in by the handler
let span = tracing::info_span!(
"handle_get_page_at_lsn_request_batched",
%tenant_id, %timeline_id, shard_id = tracing::field::Empty, req_lsn = %request_lsn,
batch_size = tracing::field::Empty, batch_id = tracing::field::Empty
);
macro_rules! current_batch_and_error {
($error:expr) => {{
let error = BatchedFeMessage::RespondError {
span,
error: $error,
};
let batch_and_error = match batch {
Some(b) => smallvec::smallvec![b, error],
None => smallvec::smallvec![error],
};
Ok(Some(BatchOrEof::Batch(batch_and_error)))
}};
}
let key = rel_block_to_key(rel, blkno);
let shard = match self
.timeline_handles
.get(*tenant_id, *timeline_id, ShardSelector::Page(key))
.instrument(span.clone())
.await
{
Ok(tl) => tl,
Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => {
// We already know this tenant exists in general, because we resolved it at
// start of connection. Getting a NotFound here indicates that the shard containing
// the requested page is not present on this node: the client's knowledge of shard->pageserver
// mapping is out of date.
//
// Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
// client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
// and talk to a different pageserver.
return current_batch_and_error!(PageStreamError::Reconnect(
"getpage@lsn request routed to wrong shard".into()
));
}
Err(e) => {
return current_batch_and_error!(e.into());
}
};
let effective_request_lsn = match Self::wait_or_get_last_lsn(
&shard,
request_lsn,
not_modified_since,
&shard.get_latest_gc_cutoff_lsn(),
ctx,
)
// TODO: if we actually need to wait for lsn here, it delays the entire batch which doesn't need to wait
.await
{
Ok(lsn) => lsn,
Err(e) => {
return current_batch_and_error!(e);
}
};
BatchedFeMessage::GetPage {
span,
shard,
effective_request_lsn,
pages: smallvec::smallvec![(rel, blkno)],
}
}
};
let batch_timeout = match self.server_side_batch_timeout {
Some(value) => value,
None => {
// Batching is not enabled - stop on the first message.
return Ok(Some(BatchOrEof::Batch(smallvec::smallvec![this_msg])));
}
};
// check if we can batch
match (&mut batch, this_msg) {
(None, this_msg) => {
batch = Some(this_msg);
}
(
Some(BatchedFeMessage::GetPage {
span: _,
shard: accum_shard,
pages: accum_pages,
effective_request_lsn: accum_lsn,
}),
BatchedFeMessage::GetPage {
span: _,
shard: this_shard,
pages: this_pages,
effective_request_lsn: this_lsn,
},
) if async {
assert_eq!(this_pages.len(), 1);
if accum_pages.len() >= Timeline::MAX_GET_VECTORED_KEYS as usize {
assert_eq!(accum_pages.len(), Timeline::MAX_GET_VECTORED_KEYS as usize);
return false;
}
if (accum_shard.tenant_shard_id, accum_shard.timeline_id)
!= (this_shard.tenant_shard_id, this_shard.timeline_id)
{
// TODO: we _could_ batch & execute each shard seperately (and in parallel).
// But the current logic for keeping responses in order does not support that.
return false;
}
// the vectored get currently only supports a single LSN, so, bounce as soon
// as the effective request_lsn changes
if *accum_lsn != this_lsn {
return false;
}
true
}
.await =>
{
// ok to batch
accum_pages.extend(this_pages);
}
(Some(_), this_msg) => {
// by default, don't continue batching
break Some(this_msg);
}
}
// batching impl piece
let started_at = batch_started_at.get_or_insert_with(Instant::now);
if started_at.elapsed() > batch_timeout {
break None;
}
};
self.next_batch = next_batch;
Ok(batch.map(|b| BatchOrEof::Batch(smallvec::smallvec![b])))
}
/// Pagestream sub-protocol handler.
///
/// It is a simple request-response protocol inside a COPYBOTH session.
@@ -592,133 +861,165 @@ impl PageServerHandler {
}
}
// If [`PageServerHandler`] is reused for multiple pagestreams,
// then make sure to not process requests from the previous ones.
self.next_batch = None;
loop {
// read request bytes (it's exactly 1 PagestreamFeMessage per CopyData)
let msg = tokio::select! {
biased;
_ = self.cancel.cancelled() => {
return Err(QueryError::Shutdown)
let maybe_batched = self
.read_batch_from_connection(pgb, &tenant_id, &timeline_id, &ctx)
.await?;
let batched = match maybe_batched {
Some(BatchOrEof::Batch(b)) => b,
Some(BatchOrEof::Eof) => {
break;
}
msg = pgb.read_message() => { msg }
};
let copy_data_bytes = match msg? {
Some(FeMessage::CopyData(bytes)) => bytes,
Some(FeMessage::Terminate) => break,
Some(m) => {
return Err(QueryError::Other(anyhow::anyhow!(
"unexpected message: {m:?} during COPY"
)));
}
None => break, // client disconnected
};
trace!("query: {copy_data_bytes:?}");
fail::fail_point!("ps::handle-pagerequest-message");
// parse request
let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
// invoke handler function
let (handler_result, span) = match neon_fe_msg {
PagestreamFeMessage::Exists(req) => {
fail::fail_point!("ps::handle-pagerequest-message::exists");
let span = tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.request_lsn);
(
self.handle_get_rel_exists_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
.await,
span,
)
}
PagestreamFeMessage::Nblocks(req) => {
fail::fail_point!("ps::handle-pagerequest-message::nblocks");
let span = tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.request_lsn);
(
self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
.await,
span,
)
}
PagestreamFeMessage::GetPage(req) => {
fail::fail_point!("ps::handle-pagerequest-message::getpage");
// shard_id is filled in by the handler
let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.request_lsn);
(
self.handle_get_page_at_lsn_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
.await,
span,
)
}
PagestreamFeMessage::DbSize(req) => {
fail::fail_point!("ps::handle-pagerequest-message::dbsize");
let span = tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.request_lsn);
(
self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
.await,
span,
)
}
PagestreamFeMessage::GetSlruSegment(req) => {
fail::fail_point!("ps::handle-pagerequest-message::slrusegment");
let span = tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.request_lsn);
(
self.handle_get_slru_segment_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
.await,
span,
)
None => {
continue;
}
};
// Map handler result to protocol behavior.
// Some handler errors cause exit from pagestream protocol.
// Other handler errors are sent back as an error message and we stay in pagestream protocol.
let response_msg = match handler_result {
Err(e) => match &e {
PageStreamError::Shutdown => {
// If we fail to fulfil a request during shutdown, which may be _because_ of
// shutdown, then do not send the error to the client. Instead just drop the
// connection.
span.in_scope(|| info!("dropping connection due to shutdown"));
return Err(QueryError::Shutdown);
for batch in batched {
// invoke handler function
let (handler_results, span): (
Vec<Result<PagestreamBeMessage, PageStreamError>>,
_,
) = match batch {
BatchedFeMessage::Exists { span, req } => {
fail::fail_point!("ps::handle-pagerequest-message::exists");
(
vec![
self.handle_get_rel_exists_request(
tenant_id,
timeline_id,
&req,
&ctx,
)
.instrument(span.clone())
.await,
],
span,
)
}
PageStreamError::Reconnect(reason) => {
span.in_scope(|| info!("handler requested reconnect: {reason}"));
return Err(QueryError::Reconnect);
BatchedFeMessage::Nblocks { span, req } => {
fail::fail_point!("ps::handle-pagerequest-message::nblocks");
(
vec![
self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
.await,
],
span,
)
}
PageStreamError::Read(_)
| PageStreamError::LsnTimeout(_)
| PageStreamError::NotFound(_)
| PageStreamError::BadRequest(_) => {
// print the all details to the log with {:#}, but for the client the
// error message is enough. Do not log if shutting down, as the anyhow::Error
// here includes cancellation which is not an error.
let full = utils::error::report_compact_sources(&e);
span.in_scope(|| {
error!("error reading relation or page version: {full:#}")
});
PagestreamBeMessage::Error(PagestreamErrorResponse {
message: e.to_string(),
})
BatchedFeMessage::GetPage {
span,
shard,
effective_request_lsn,
pages,
} => {
fail::fail_point!("ps::handle-pagerequest-message::getpage");
(
{
let npages = pages.len();
let res = self
.handle_get_page_at_lsn_request_batched(
&shard,
effective_request_lsn,
pages,
&ctx,
)
.instrument(span.clone())
.await;
assert_eq!(res.len(), npages);
res
},
span,
)
}
},
Ok(response_msg) => response_msg,
};
BatchedFeMessage::DbSize { span, req } => {
fail::fail_point!("ps::handle-pagerequest-message::dbsize");
(
vec![
self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
.await,
],
span,
)
}
BatchedFeMessage::GetSlruSegment { span, req } => {
fail::fail_point!("ps::handle-pagerequest-message::slrusegment");
(
vec![
self.handle_get_slru_segment_request(
tenant_id,
timeline_id,
&req,
&ctx,
)
.instrument(span.clone())
.await,
],
span,
)
}
BatchedFeMessage::RespondError { span, error } => {
// We've already decided to respond with an error, so we don't need to
// call the handler.
(vec![Err(error)], span)
}
};
// marshal & transmit response message
pgb.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?;
tokio::select! {
biased;
_ = self.cancel.cancelled() => {
// We were requested to shut down.
info!("shutdown request received in page handler");
return Err(QueryError::Shutdown)
// Map handler result to protocol behavior.
// Some handler errors cause exit from pagestream protocol.
// Other handler errors are sent back as an error message and we stay in pagestream protocol.
for handler_result in handler_results {
let response_msg = match handler_result {
Err(e) => match &e {
PageStreamError::Shutdown => {
// If we fail to fulfil a request during shutdown, which may be _because_ of
// shutdown, then do not send the error to the client. Instead just drop the
// connection.
span.in_scope(|| info!("dropping connection due to shutdown"));
return Err(QueryError::Shutdown);
}
PageStreamError::Reconnect(reason) => {
span.in_scope(|| info!("handler requested reconnect: {reason}"));
return Err(QueryError::Reconnect);
}
PageStreamError::Read(_)
| PageStreamError::LsnTimeout(_)
| PageStreamError::NotFound(_)
| PageStreamError::BadRequest(_) => {
// print the all details to the log with {:#}, but for the client the
// error message is enough. Do not log if shutting down, as the anyhow::Error
// here includes cancellation which is not an error.
let full = utils::error::report_compact_sources(&e);
span.in_scope(|| {
error!("error reading relation or page version: {full:#}")
});
PagestreamBeMessage::Error(PagestreamErrorResponse {
message: e.to_string(),
})
}
},
Ok(response_msg) => response_msg,
};
// marshal & transmit response message
pgb.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?;
}
res = pgb.flush() => {
res?;
tokio::select! {
biased;
_ = self.cancel.cancelled() => {
// We were requested to shut down.
info!("shutdown request received in page handler");
return Err(QueryError::Shutdown)
}
res = pgb.flush() => {
res?;
}
}
}
}
@@ -964,60 +1265,30 @@ impl PageServerHandler {
}))
}
#[instrument(skip_all, fields(shard_id))]
async fn handle_get_page_at_lsn_request(
#[instrument(skip_all)]
async fn handle_get_page_at_lsn_request_batched(
&mut self,
tenant_id: TenantId,
timeline_id: TimelineId,
req: &PagestreamGetPageRequest,
timeline: &Timeline,
effective_lsn: Lsn,
pages: smallvec::SmallVec<[(RelTag, BlockNumber); 1]>,
ctx: &RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
let timeline = match self
.timeline_handles
.get(
tenant_id,
timeline_id,
ShardSelector::Page(rel_block_to_key(req.rel, req.blkno)),
)
.await
{
Ok(tl) => tl,
Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => {
// We already know this tenant exists in general, because we resolved it at
// start of connection. Getting a NotFound here indicates that the shard containing
// the requested page is not present on this node: the client's knowledge of shard->pageserver
// mapping is out of date.
//
// Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
// client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
// and talk to a different pageserver.
return Err(PageStreamError::Reconnect(
"getpage@lsn request routed to wrong shard".into(),
));
}
Err(e) => return Err(e.into()),
};
let _timer = timeline
.query_metrics
.start_timer(metrics::SmgrQueryType::GetPageAtLsn, ctx);
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn(
&timeline,
req.request_lsn,
req.not_modified_since,
&latest_gc_cutoff_lsn,
) -> Vec<Result<PagestreamBeMessage, PageStreamError>> {
debug_assert_current_span_has_tenant_and_timeline_id();
let _timer = timeline.query_metrics.start_timer_many(
metrics::SmgrQueryType::GetPageAtLsn,
pages.len(),
ctx,
)
.await?;
);
let page = timeline
.get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), ctx)
.await?;
let pages = timeline
.get_rel_page_at_lsn_batched(pages, effective_lsn, ctx)
.await;
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
page,
Vec::from_iter(pages.into_iter().map(|page| {
page.map(|page| {
PagestreamBeMessage::GetPage(models::PagestreamGetPageResponse { page })
})
.map_err(PageStreamError::from)
}))
}
@@ -1674,6 +1945,13 @@ fn set_tracing_field_shard_id(timeline: &Timeline) {
debug_assert_current_span_has_tenant_and_timeline_id();
}
struct WaitedForLsn(Lsn);
impl From<WaitedForLsn> for Lsn {
fn from(WaitedForLsn(lsn): WaitedForLsn) -> Self {
lsn
}
}
#[cfg(test)]
mod tests {
use utils::shard::ShardCount;

View File

@@ -10,10 +10,15 @@ use super::tenant::{PageReconstructError, Timeline};
use crate::aux_file;
use crate::context::RequestContext;
use crate::keyspace::{KeySpace, KeySpaceAccum};
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
use crate::span::{
debug_assert_current_span_has_tenant_and_timeline_id,
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
};
use crate::tenant::timeline::GetVectoredError;
use anyhow::{ensure, Context};
use bytes::{Buf, Bytes, BytesMut};
use enum_map::Enum;
use itertools::Itertools;
use pageserver_api::key::Key;
use pageserver_api::key::{
dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range, rel_size_to_key,
@@ -30,7 +35,7 @@ use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi::BLCKSZ;
use postgres_ffi::{Oid, RepOriginId, TimestampTz, TransactionId};
use serde::{Deserialize, Serialize};
use std::collections::{hash_map, HashMap, HashSet};
use std::collections::{hash_map, BTreeMap, HashMap, HashSet};
use std::ops::ControlFlow;
use std::ops::Range;
use strum::IntoEnumIterator;
@@ -193,26 +198,195 @@ impl Timeline {
version: Version<'_>,
ctx: &RequestContext,
) -> Result<Bytes, PageReconstructError> {
if tag.relnode == 0 {
return Err(PageReconstructError::Other(
RelationError::InvalidRelnode.into(),
));
match version {
Version::Lsn(effective_lsn) => {
let pages = smallvec::smallvec![(tag, blknum)];
let res = self
.get_rel_page_at_lsn_batched(pages, effective_lsn, ctx)
.await;
assert_eq!(res.len(), 1);
res.into_iter().next().unwrap()
}
Version::Modified(modification) => {
if tag.relnode == 0 {
return Err(PageReconstructError::Other(
RelationError::InvalidRelnode.into(),
));
}
let nblocks = self.get_rel_size(tag, version, ctx).await?;
if blknum >= nblocks {
debug!(
"read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
tag,
blknum,
version.get_lsn(),
nblocks
);
return Ok(ZERO_PAGE.clone());
}
let key = rel_block_to_key(tag, blknum);
modification.get(key, ctx).await
}
}
}
/// Like [`Self::get_rel_page_at_lsn`], but returns a batch of pages.
///
/// The ordering of the returned vec corresponds to the ordering of `pages`.
pub(crate) async fn get_rel_page_at_lsn_batched(
&self,
pages: smallvec::SmallVec<[(RelTag, BlockNumber); 1]>,
effective_lsn: Lsn,
ctx: &RequestContext,
) -> Vec<Result<Bytes, PageReconstructError>> {
debug_assert_current_span_has_tenant_and_timeline_id();
let mut slots_filled = 0;
let page_count = pages.len();
// Would be nice to use smallvec here but it doesn't provide the spare_capacity_mut() API.
let mut result = Vec::with_capacity(pages.len());
let result_slots = result.spare_capacity_mut();
let mut keys_slots: BTreeMap<Key, smallvec::SmallVec<[usize; 1]>> = BTreeMap::default();
for (response_slot_idx, (tag, blknum)) in pages.into_iter().enumerate() {
if tag.relnode == 0 {
result_slots[response_slot_idx].write(Err(PageReconstructError::Other(
RelationError::InvalidRelnode.into(),
)));
slots_filled += 1;
continue;
}
let nblocks = match self
.get_rel_size(tag, Version::Lsn(effective_lsn), ctx)
.await
{
Ok(nblocks) => nblocks,
Err(err) => {
result_slots[response_slot_idx].write(Err(err));
slots_filled += 1;
continue;
}
};
if blknum >= nblocks {
debug!(
"read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
tag, blknum, effective_lsn, nblocks
);
result_slots[response_slot_idx].write(Ok(ZERO_PAGE.clone()));
slots_filled += 1;
continue;
}
let key = rel_block_to_key(tag, blknum);
let key_slots = keys_slots.entry(key).or_default();
key_slots.push(response_slot_idx);
}
let nblocks = self.get_rel_size(tag, version, ctx).await?;
if blknum >= nblocks {
debug!(
"read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
tag,
blknum,
version.get_lsn(),
nblocks
);
return Ok(ZERO_PAGE.clone());
let keyspace = {
// add_key requires monotonicity
let mut acc = KeySpaceAccum::new();
for key in keys_slots
.keys()
// in fact it requires strong monotonicity
.dedup()
{
acc.add_key(*key);
}
acc.to_keyspace()
};
match self.get_vectored(keyspace, effective_lsn, ctx).await {
Ok(results) => {
for (key, res) in results {
let mut key_slots = keys_slots.remove(&key).unwrap().into_iter();
let first_slot = key_slots.next().unwrap();
for slot in key_slots {
let clone = match &res {
Ok(buf) => Ok(buf.clone()),
Err(err) => Err(match err {
PageReconstructError::Cancelled => {
PageReconstructError::Cancelled
}
x @ PageReconstructError::Other(_) |
x @ PageReconstructError::AncestorLsnTimeout(_) |
x @ PageReconstructError::WalRedo(_) |
x @ PageReconstructError::MissingKey(_) => {
PageReconstructError::Other(anyhow::anyhow!("there was more than one request for this key in the batch, error logged once: {x:?}"))
},
}),
};
result_slots[slot].write(clone);
slots_filled += 1;
}
result_slots[first_slot].write(res);
slots_filled += 1;
}
}
Err(err) => {
// this cannot really happen because get_vectored only errors globally on invalid LSN or too large batch size
// (We enforce the max batch size outside of this function, in the code that constructs the batch request.)
for slot in keys_slots.values().flatten() {
// this whole `match` is a lot like `From<GetVectoredError> for PageReconstructError`
// but without taking ownership of the GetVectoredError
let err = match &err {
GetVectoredError::Cancelled => {
Err(PageReconstructError::Cancelled)
}
// TODO: restructure get_vectored API to make this error per-key
GetVectoredError::MissingKey(err) => {
Err(PageReconstructError::Other(anyhow::anyhow!("whole vectored get request failed because one or more of the requested keys were missing: {err:?}")))
}
// TODO: restructure get_vectored API to make this error per-key
GetVectoredError::GetReadyAncestorError(err) => {
Err(PageReconstructError::Other(anyhow::anyhow!("whole vectored get request failed because one or more key required ancestor that wasn't ready: {err:?}")))
}
// TODO: restructure get_vectored API to make this error per-key
GetVectoredError::Other(err) => {
Err(PageReconstructError::Other(
anyhow::anyhow!("whole vectored get request failed: {err:?}"),
))
}
// TODO: we can prevent this error class by moving this check into the type system
GetVectoredError::InvalidLsn(e) => {
Err(anyhow::anyhow!("invalid LSN: {e:?}").into())
}
// NB: this should never happen in practice because we limit MAX_GET_VECTORED_KEYS
// TODO: we can prevent this error class by moving this check into the type system
GetVectoredError::Oversized(err) => {
Err(anyhow::anyhow!(
"batching oversized: {err:?}"
)
.into())
}
};
result_slots[*slot].write(err);
}
slots_filled += keys_slots.values().map(|slots| slots.len()).sum::<usize>();
}
};
assert_eq!(slots_filled, page_count);
// SAFETY:
// 1. `result` and any of its uninint members are not read from until this point
// 2. The length below is tracked at run-time and matches the number of requested pages.
unsafe {
result.set_len(page_count);
}
let key = rel_block_to_key(tag, blknum);
version.get(self, key, ctx).await
result
}
// Get size of a database in blocks

View File

@@ -1528,6 +1528,11 @@ mod tests {
assert_current_logical_size(&tline, Lsn(0x50));
let test_span = tracing::info_span!(parent: None, "test",
tenant_id=%tline.tenant_shard_id.tenant_id,
shard_id=%tline.tenant_shard_id.shard_slug(),
timeline_id=%tline.timeline_id);
// The relation was created at LSN 2, not visible at LSN 1 yet.
assert_eq!(
tline
@@ -1562,6 +1567,7 @@ mod tests {
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x20)), &ctx)
.instrument(test_span.clone())
.await?,
test_img("foo blk 0 at 2")
);
@@ -1569,6 +1575,7 @@ mod tests {
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x30)), &ctx)
.instrument(test_span.clone())
.await?,
test_img("foo blk 0 at 3")
);
@@ -1576,12 +1583,14 @@ mod tests {
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x40)), &ctx)
.instrument(test_span.clone())
.await?,
test_img("foo blk 0 at 3")
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x40)), &ctx)
.instrument(test_span.clone())
.await?,
test_img("foo blk 1 at 4")
);
@@ -1589,18 +1598,21 @@ mod tests {
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x50)), &ctx)
.instrument(test_span.clone())
.await?,
test_img("foo blk 0 at 3")
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x50)), &ctx)
.instrument(test_span.clone())
.await?,
test_img("foo blk 1 at 4")
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), &ctx)
.instrument(test_span.clone())
.await?,
test_img("foo blk 2 at 5")
);
@@ -1623,12 +1635,14 @@ mod tests {
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x60)), &ctx)
.instrument(test_span.clone())
.await?,
test_img("foo blk 0 at 3")
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x60)), &ctx)
.instrument(test_span.clone())
.await?,
test_img("foo blk 1 at 4")
);
@@ -1643,6 +1657,7 @@ mod tests {
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), &ctx)
.instrument(test_span.clone())
.await?,
test_img("foo blk 2 at 5")
);
@@ -1675,12 +1690,14 @@ mod tests {
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x70)), &ctx)
.instrument(test_span.clone())
.await?,
ZERO_PAGE
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x70)), &ctx)
.instrument(test_span.clone())
.await?,
test_img("foo blk 1")
);
@@ -1701,6 +1718,7 @@ mod tests {
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, blk, Version::Lsn(Lsn(0x80)), &ctx)
.instrument(test_span.clone())
.await?,
ZERO_PAGE
);
@@ -1708,6 +1726,7 @@ mod tests {
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 1500, Version::Lsn(Lsn(0x80)), &ctx)
.instrument(test_span.clone())
.await?,
test_img("foo blk 1500")
);
@@ -1815,6 +1834,11 @@ mod tests {
}
m.commit(&ctx).await?;
let test_span = tracing::info_span!(parent: None, "test",
tenant_id=%tline.tenant_shard_id.tenant_id,
shard_id=%tline.tenant_shard_id.shard_slug(),
timeline_id=%tline.timeline_id);
// The relation was created at LSN 20, not visible at LSN 1 yet.
assert_eq!(
tline
@@ -1847,6 +1871,7 @@ mod tests {
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(lsn), &ctx)
.instrument(test_span.clone())
.await?,
test_img(&data)
);
@@ -1874,6 +1899,7 @@ mod tests {
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x60)), &ctx)
.instrument(test_span.clone())
.await?,
test_img(&data)
);
@@ -1892,6 +1918,7 @@ mod tests {
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x50)), &ctx)
.instrument(test_span.clone())
.await?,
test_img(&data)
);
@@ -1928,6 +1955,7 @@ mod tests {
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x80)), &ctx)
.instrument(test_span.clone())
.await?,
test_img(&data)
);