From 5d194c78247fd39f30f4f24c0ee20150e4e582b6 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 12 Sep 2024 11:13:37 +0000 Subject: [PATCH] debounce: bounce if shard or effective request_lsn differ --- pageserver/src/page_service.rs | 252 ++++++++++++++++++++++++--------- 1 file changed, 189 insertions(+), 63 deletions(-) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index daac888963..9f33662a4b 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -6,7 +6,7 @@ use async_compression::tokio::write::GzipEncoder; use bytes::Buf; use futures::FutureExt; use once_cell::sync::{Lazy, OnceCell}; -use pageserver_api::models::TenantState; +use pageserver_api::models::{self, TenantState}; use pageserver_api::models::{ PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse, PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse, @@ -58,7 +58,7 @@ use crate::tenant::GetTimelineError; use crate::tenant::PageReconstructError; use crate::tenant::Timeline; use pageserver_api::key::rel_block_to_key; -use pageserver_api::reltag::SlruKind; +use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind}; use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID; use postgres_ffi::BLCKSZ; @@ -577,13 +577,27 @@ impl PageServerHandler { } } - let mut requests = Vec::new(); + let mut batch = Vec::new(); let mut num_consecutive_getpage_requests = 0; 'outer: loop { - // read request bytes (it's exactly 1 PagestreamFeMessage per CopyData) + enum DebouncedFeMessage { + Exists(models::PagestreamExistsRequest), + Nblocks(models::PagestreamNblocksRequest), + GetPage { + span: Span, + timeline: timeline::handle::Handle, + rel: RelTag, + blkno: BlockNumber, + effective_request_lsn: Lsn, + }, + DbSize(models::PagestreamDbSizeRequest), + GetSlruSegment(models::PagestreamGetSlruSegmentRequest), + RespondError(Span, PageStreamError), + } let mut debounce: Option = None; - requests.clear(); - loop { + // return or `?` on protocol error + // `break EXPR` to stop batching. The EXPR will be the first message in the next batch. + let after_batch: Option = loop { static BOUNCE_TIMEOUT: Lazy = Lazy::new(|| { utils::env::var::("NEON_PAGESERVER_DEBOUNCE") .unwrap() @@ -605,7 +619,8 @@ impl PageServerHandler { msg } _ = sleep_fut => { - break; + assert!(!batch.is_empty()); + break None; } }; let copy_data_bytes = match msg? { @@ -624,23 +639,141 @@ impl PageServerHandler { // parse request let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?; - requests.push(neon_fe_msg); + let this_msg = match neon_fe_msg { + PagestreamFeMessage::Exists(msg) => DebouncedFeMessage::Exists(msg), + PagestreamFeMessage::Nblocks(msg) => DebouncedFeMessage::Nblocks(msg), + PagestreamFeMessage::DbSize(msg) => DebouncedFeMessage::DbSize(msg), + PagestreamFeMessage::GetSlruSegment(msg) => { + DebouncedFeMessage::GetSlruSegment(msg) + } + PagestreamFeMessage::GetPage(PagestreamGetPageRequest { + request_lsn, + not_modified_since, + rel, + blkno, + }) => { + let span = tracing::info_span!("handle_get_page_at_lsn_request", %tenant_id, %timeline_id, shard_id = tracing::field::Empty, %rel, %blkno, req_lsn = %request_lsn); + let key = rel_block_to_key(rel, blkno); + let timeline = match self + .timeline_handles + .get(tenant_id, timeline_id, ShardSelector::Page(key)) + .await + { + Ok(tl) => tl, + Err(GetActiveTimelineError::Tenant( + GetActiveTenantError::NotFound(_), + )) => { + // We already know this tenant exists in general, because we resolved it at + // start of connection. Getting a NotFound here indicates that the shard containing + // the requested page is not present on this node: the client's knowledge of shard->pageserver + // mapping is out of date. + // + // Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via + // client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration + // and talk to a different pageserver. + break Some(DebouncedFeMessage::RespondError( + span, + PageStreamError::Reconnect( + "getpage@lsn request routed to wrong shard".into(), + ), + )); + } + Err(e) => break Some(DebouncedFeMessage::RespondError(span, e.into())), + }; + span.record( + "shard_id", + tracing::field::display(timeline.tenant_shard_id.shard_slug()), + ); + let effective_request_lsn = match Self::wait_or_get_last_lsn( + &timeline, + request_lsn, + not_modified_since, + &timeline.get_latest_gc_cutoff_lsn(), + &ctx, + ) + // TODO: if we actually need to wait for lsn here, it delays the entire batch which doesn't need to wait + .await + { + Ok(lsn) => lsn, + Err(e) => { + break Some(DebouncedFeMessage::RespondError(span, e)); + } + }; + DebouncedFeMessage::GetPage { + span, + timeline, + rel, + blkno, + effective_request_lsn, + } + } + }; - // debounce + // check if we can debounce + match (batch.last(), this_msg) { + (None, this_msg) => { + batch.push(this_msg); + } + ( + Some(DebouncedFeMessage::GetPage { + span: _, + timeline: prev_shard, + rel: _, + blkno: _, + effective_request_lsn: prev_lsn, + }), + DebouncedFeMessage::GetPage { + span, + timeline: this_shard, + rel, + blkno, + effective_request_lsn: this_lsn, + }, + ) if async { + if (prev_shard.tenant_shard_id, prev_shard.timeline_id) + != (this_shard.tenant_shard_id, this_shard.timeline_id) + { + // TODO: we _could_ batch & execute each shard seperately (and in parallel). + // But the current logig for keeping responses in order does not support that. + return false; + } + // the vectored get currently only supports a single LSN, so, bounce as soon + // as the effective request_lsn changes + return *prev_lsn == this_lsn; + } + .await => + { + // ok to batch + batch.push(DebouncedFeMessage::GetPage { + span, + timeline: this_shard, + rel, + blkno, + effective_request_lsn: this_lsn, + }); + } + (Some(_), this_msg) => { + // by default, don't continue batching + break Some(this_msg); + } + } + + // debounce impl piece let started_at = debounce.get_or_insert_with(Instant::now); if started_at.elapsed() > *BOUNCE_TIMEOUT { - break; + break None; } - } + }; + assert!(!batch.is_empty()); CONSECUTIVE_NONBLOCKING_GETPAGE_REQUESTS_HISTOGRAM .observe(num_consecutive_getpage_requests as f64); num_consecutive_getpage_requests = 0; - for neon_fe_msg in requests.drain(..) { + for msg in batch.drain(..) { // invoke handler function - let (handler_result, span) = match neon_fe_msg { - PagestreamFeMessage::Exists(req) => { + let (handler_result, span) = match msg { + DebouncedFeMessage::Exists(req) => { CONSECUTIVE_NONBLOCKING_GETPAGE_REQUESTS_HISTOGRAM .observe(num_consecutive_getpage_requests as f64); num_consecutive_getpage_requests = 0; @@ -654,7 +787,7 @@ impl PageServerHandler { span, ) } - PagestreamFeMessage::Nblocks(req) => { + DebouncedFeMessage::Nblocks(req) => { CONSECUTIVE_NONBLOCKING_GETPAGE_REQUESTS_HISTOGRAM .observe(num_consecutive_getpage_requests as f64); num_consecutive_getpage_requests = 0; @@ -667,19 +800,31 @@ impl PageServerHandler { span, ) } - PagestreamFeMessage::GetPage(req) => { + DebouncedFeMessage::GetPage { + span, + timeline, + rel, + blkno, + effective_request_lsn, + } => { num_consecutive_getpage_requests += 1; fail::fail_point!("ps::handle-pagerequest-message::getpage"); // shard_id is filled in by the handler - let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.request_lsn); ( - self.handle_get_page_at_lsn_request(tenant_id, timeline_id, &req, &ctx) - .instrument(span.clone()) - .await, + // TODO: issue vectored get + self.handle_get_page_at_lsn_request( + &timeline, + rel, + blkno, + effective_request_lsn, + &ctx, + ) + .instrument(span.clone()) + .await, span, ) } - PagestreamFeMessage::DbSize(req) => { + DebouncedFeMessage::DbSize(req) => { CONSECUTIVE_NONBLOCKING_GETPAGE_REQUESTS_HISTOGRAM .observe(num_consecutive_getpage_requests as f64); num_consecutive_getpage_requests = 0; @@ -692,7 +837,7 @@ impl PageServerHandler { span, ) } - PagestreamFeMessage::GetSlruSegment(req) => { + DebouncedFeMessage::GetSlruSegment(req) => { CONSECUTIVE_NONBLOCKING_GETPAGE_REQUESTS_HISTOGRAM .observe(num_consecutive_getpage_requests as f64); num_consecutive_getpage_requests = 0; @@ -710,6 +855,11 @@ impl PageServerHandler { span, ) } + DebouncedFeMessage::RespondError(span, e) => { + // We've already decided to respond with an error, so we don't need to + // call the handler. + (Err(e), span) + } }; // Map handler result to protocol behavior. @@ -761,6 +911,9 @@ impl PageServerHandler { } } } + + assert!(batch.is_empty()); + batch.extend(after_batch.into_iter()); } Ok(()) } @@ -1004,56 +1157,22 @@ impl PageServerHandler { })) } - #[instrument(skip_all, fields(shard_id))] + #[instrument(skip_all)] async fn handle_get_page_at_lsn_request( &mut self, - tenant_id: TenantId, - timeline_id: TimelineId, - req: &PagestreamGetPageRequest, + timeline: &Timeline, + rel: RelTag, + blkno: BlockNumber, + effective_lsn: Lsn, ctx: &RequestContext, ) -> Result { - let timeline = match self - .timeline_handles - .get( - tenant_id, - timeline_id, - ShardSelector::Page(rel_block_to_key(req.rel, req.blkno)), - ) - .await - { - Ok(tl) => tl, - Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => { - // We already know this tenant exists in general, because we resolved it at - // start of connection. Getting a NotFound here indicates that the shard containing - // the requested page is not present on this node: the client's knowledge of shard->pageserver - // mapping is out of date. - // - // Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via - // client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration - // and talk to a different pageserver. - return Err(PageStreamError::Reconnect( - "getpage@lsn request routed to wrong shard".into(), - )); - } - Err(e) => return Err(e.into()), - }; - + debug_assert_current_span_has_tenant_and_timeline_id(); let _timer = timeline .query_metrics .start_timer(metrics::SmgrQueryType::GetPageAtLsn, ctx); - let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); - let lsn = Self::wait_or_get_last_lsn( - &timeline, - req.request_lsn, - req.not_modified_since, - &latest_gc_cutoff_lsn, - ctx, - ) - .await?; - let page = timeline - .get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), ctx) + .get_rel_page_at_lsn(rel, blkno, Version::Lsn(effective_lsn), ctx) .await?; Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse { @@ -1554,3 +1673,10 @@ fn set_tracing_field_shard_id(timeline: &Timeline) { ); debug_assert_current_span_has_tenant_and_timeline_id(); } + +struct WaitedForLsn(Lsn); +impl From for Lsn { + fn from(WaitedForLsn(lsn): WaitedForLsn) -> Self { + lsn + } +}