debounce: bounce if shard or effective request_lsn differ

This commit is contained in:
Christian Schwarz
2024-09-12 11:13:37 +00:00
parent ac2702afd3
commit 5d194c7824

View File

@@ -6,7 +6,7 @@ use async_compression::tokio::write::GzipEncoder;
use bytes::Buf;
use futures::FutureExt;
use once_cell::sync::{Lazy, OnceCell};
use pageserver_api::models::TenantState;
use pageserver_api::models::{self, TenantState};
use pageserver_api::models::{
PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
@@ -58,7 +58,7 @@ use crate::tenant::GetTimelineError;
use crate::tenant::PageReconstructError;
use crate::tenant::Timeline;
use pageserver_api::key::rel_block_to_key;
use pageserver_api::reltag::SlruKind;
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
use postgres_ffi::BLCKSZ;
@@ -577,13 +577,27 @@ impl PageServerHandler {
}
}
let mut requests = Vec::new();
let mut batch = Vec::new();
let mut num_consecutive_getpage_requests = 0;
'outer: loop {
// read request bytes (it's exactly 1 PagestreamFeMessage per CopyData)
enum DebouncedFeMessage {
Exists(models::PagestreamExistsRequest),
Nblocks(models::PagestreamNblocksRequest),
GetPage {
span: Span,
timeline: timeline::handle::Handle<TenantManagerTypes>,
rel: RelTag,
blkno: BlockNumber,
effective_request_lsn: Lsn,
},
DbSize(models::PagestreamDbSizeRequest),
GetSlruSegment(models::PagestreamGetSlruSegmentRequest),
RespondError(Span, PageStreamError),
}
let mut debounce: Option<std::time::Instant> = None;
requests.clear();
loop {
// return or `?` on protocol error
// `break EXPR` to stop batching. The EXPR will be the first message in the next batch.
let after_batch: Option<DebouncedFeMessage> = loop {
static BOUNCE_TIMEOUT: Lazy<Duration> = Lazy::new(|| {
utils::env::var::<humantime::Duration, _>("NEON_PAGESERVER_DEBOUNCE")
.unwrap()
@@ -605,7 +619,8 @@ impl PageServerHandler {
msg
}
_ = sleep_fut => {
break;
assert!(!batch.is_empty());
break None;
}
};
let copy_data_bytes = match msg? {
@@ -624,23 +639,141 @@ impl PageServerHandler {
// parse request
let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
requests.push(neon_fe_msg);
let this_msg = match neon_fe_msg {
PagestreamFeMessage::Exists(msg) => DebouncedFeMessage::Exists(msg),
PagestreamFeMessage::Nblocks(msg) => DebouncedFeMessage::Nblocks(msg),
PagestreamFeMessage::DbSize(msg) => DebouncedFeMessage::DbSize(msg),
PagestreamFeMessage::GetSlruSegment(msg) => {
DebouncedFeMessage::GetSlruSegment(msg)
}
PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
request_lsn,
not_modified_since,
rel,
blkno,
}) => {
let span = tracing::info_span!("handle_get_page_at_lsn_request", %tenant_id, %timeline_id, shard_id = tracing::field::Empty, %rel, %blkno, req_lsn = %request_lsn);
let key = rel_block_to_key(rel, blkno);
let timeline = match self
.timeline_handles
.get(tenant_id, timeline_id, ShardSelector::Page(key))
.await
{
Ok(tl) => tl,
Err(GetActiveTimelineError::Tenant(
GetActiveTenantError::NotFound(_),
)) => {
// We already know this tenant exists in general, because we resolved it at
// start of connection. Getting a NotFound here indicates that the shard containing
// the requested page is not present on this node: the client's knowledge of shard->pageserver
// mapping is out of date.
//
// Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
// client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
// and talk to a different pageserver.
break Some(DebouncedFeMessage::RespondError(
span,
PageStreamError::Reconnect(
"getpage@lsn request routed to wrong shard".into(),
),
));
}
Err(e) => break Some(DebouncedFeMessage::RespondError(span, e.into())),
};
span.record(
"shard_id",
tracing::field::display(timeline.tenant_shard_id.shard_slug()),
);
let effective_request_lsn = match Self::wait_or_get_last_lsn(
&timeline,
request_lsn,
not_modified_since,
&timeline.get_latest_gc_cutoff_lsn(),
&ctx,
)
// TODO: if we actually need to wait for lsn here, it delays the entire batch which doesn't need to wait
.await
{
Ok(lsn) => lsn,
Err(e) => {
break Some(DebouncedFeMessage::RespondError(span, e));
}
};
DebouncedFeMessage::GetPage {
span,
timeline,
rel,
blkno,
effective_request_lsn,
}
}
};
// debounce
// check if we can debounce
match (batch.last(), this_msg) {
(None, this_msg) => {
batch.push(this_msg);
}
(
Some(DebouncedFeMessage::GetPage {
span: _,
timeline: prev_shard,
rel: _,
blkno: _,
effective_request_lsn: prev_lsn,
}),
DebouncedFeMessage::GetPage {
span,
timeline: this_shard,
rel,
blkno,
effective_request_lsn: this_lsn,
},
) if async {
if (prev_shard.tenant_shard_id, prev_shard.timeline_id)
!= (this_shard.tenant_shard_id, this_shard.timeline_id)
{
// TODO: we _could_ batch & execute each shard seperately (and in parallel).
// But the current logig for keeping responses in order does not support that.
return false;
}
// the vectored get currently only supports a single LSN, so, bounce as soon
// as the effective request_lsn changes
return *prev_lsn == this_lsn;
}
.await =>
{
// ok to batch
batch.push(DebouncedFeMessage::GetPage {
span,
timeline: this_shard,
rel,
blkno,
effective_request_lsn: this_lsn,
});
}
(Some(_), this_msg) => {
// by default, don't continue batching
break Some(this_msg);
}
}
// debounce impl piece
let started_at = debounce.get_or_insert_with(Instant::now);
if started_at.elapsed() > *BOUNCE_TIMEOUT {
break;
break None;
}
}
};
assert!(!batch.is_empty());
CONSECUTIVE_NONBLOCKING_GETPAGE_REQUESTS_HISTOGRAM
.observe(num_consecutive_getpage_requests as f64);
num_consecutive_getpage_requests = 0;
for neon_fe_msg in requests.drain(..) {
for msg in batch.drain(..) {
// invoke handler function
let (handler_result, span) = match neon_fe_msg {
PagestreamFeMessage::Exists(req) => {
let (handler_result, span) = match msg {
DebouncedFeMessage::Exists(req) => {
CONSECUTIVE_NONBLOCKING_GETPAGE_REQUESTS_HISTOGRAM
.observe(num_consecutive_getpage_requests as f64);
num_consecutive_getpage_requests = 0;
@@ -654,7 +787,7 @@ impl PageServerHandler {
span,
)
}
PagestreamFeMessage::Nblocks(req) => {
DebouncedFeMessage::Nblocks(req) => {
CONSECUTIVE_NONBLOCKING_GETPAGE_REQUESTS_HISTOGRAM
.observe(num_consecutive_getpage_requests as f64);
num_consecutive_getpage_requests = 0;
@@ -667,19 +800,31 @@ impl PageServerHandler {
span,
)
}
PagestreamFeMessage::GetPage(req) => {
DebouncedFeMessage::GetPage {
span,
timeline,
rel,
blkno,
effective_request_lsn,
} => {
num_consecutive_getpage_requests += 1;
fail::fail_point!("ps::handle-pagerequest-message::getpage");
// shard_id is filled in by the handler
let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.request_lsn);
(
self.handle_get_page_at_lsn_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
.await,
// TODO: issue vectored get
self.handle_get_page_at_lsn_request(
&timeline,
rel,
blkno,
effective_request_lsn,
&ctx,
)
.instrument(span.clone())
.await,
span,
)
}
PagestreamFeMessage::DbSize(req) => {
DebouncedFeMessage::DbSize(req) => {
CONSECUTIVE_NONBLOCKING_GETPAGE_REQUESTS_HISTOGRAM
.observe(num_consecutive_getpage_requests as f64);
num_consecutive_getpage_requests = 0;
@@ -692,7 +837,7 @@ impl PageServerHandler {
span,
)
}
PagestreamFeMessage::GetSlruSegment(req) => {
DebouncedFeMessage::GetSlruSegment(req) => {
CONSECUTIVE_NONBLOCKING_GETPAGE_REQUESTS_HISTOGRAM
.observe(num_consecutive_getpage_requests as f64);
num_consecutive_getpage_requests = 0;
@@ -710,6 +855,11 @@ impl PageServerHandler {
span,
)
}
DebouncedFeMessage::RespondError(span, e) => {
// We've already decided to respond with an error, so we don't need to
// call the handler.
(Err(e), span)
}
};
// Map handler result to protocol behavior.
@@ -761,6 +911,9 @@ impl PageServerHandler {
}
}
}
assert!(batch.is_empty());
batch.extend(after_batch.into_iter());
}
Ok(())
}
@@ -1004,56 +1157,22 @@ impl PageServerHandler {
}))
}
#[instrument(skip_all, fields(shard_id))]
#[instrument(skip_all)]
async fn handle_get_page_at_lsn_request(
&mut self,
tenant_id: TenantId,
timeline_id: TimelineId,
req: &PagestreamGetPageRequest,
timeline: &Timeline,
rel: RelTag,
blkno: BlockNumber,
effective_lsn: Lsn,
ctx: &RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
let timeline = match self
.timeline_handles
.get(
tenant_id,
timeline_id,
ShardSelector::Page(rel_block_to_key(req.rel, req.blkno)),
)
.await
{
Ok(tl) => tl,
Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => {
// We already know this tenant exists in general, because we resolved it at
// start of connection. Getting a NotFound here indicates that the shard containing
// the requested page is not present on this node: the client's knowledge of shard->pageserver
// mapping is out of date.
//
// Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
// client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
// and talk to a different pageserver.
return Err(PageStreamError::Reconnect(
"getpage@lsn request routed to wrong shard".into(),
));
}
Err(e) => return Err(e.into()),
};
debug_assert_current_span_has_tenant_and_timeline_id();
let _timer = timeline
.query_metrics
.start_timer(metrics::SmgrQueryType::GetPageAtLsn, ctx);
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn(
&timeline,
req.request_lsn,
req.not_modified_since,
&latest_gc_cutoff_lsn,
ctx,
)
.await?;
let page = timeline
.get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), ctx)
.get_rel_page_at_lsn(rel, blkno, Version::Lsn(effective_lsn), ctx)
.await?;
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
@@ -1554,3 +1673,10 @@ fn set_tracing_field_shard_id(timeline: &Timeline) {
);
debug_assert_current_span_has_tenant_and_timeline_id();
}
struct WaitedForLsn(Lsn);
impl From<WaitedForLsn> for Lsn {
fn from(WaitedForLsn(lsn): WaitedForLsn) -> Self {
lsn
}
}