diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 2ae8d91b83..37c728a7f0 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -1185,6 +1185,7 @@ struct GlobalAndPerTimelineHistogramTimer<'a, 'c> { ctx: &'c RequestContext, start: std::time::Instant, op: SmgrQueryType, + count: usize, } impl<'a, 'c> Drop for GlobalAndPerTimelineHistogramTimer<'a, 'c> { @@ -1212,9 +1213,11 @@ impl<'a, 'c> Drop for GlobalAndPerTimelineHistogramTimer<'a, 'c> { elapsed } }; - self.global_metric.observe(ex_throttled.as_secs_f64()); - if let Some(timeline_metric) = self.timeline_metric { - timeline_metric.observe(ex_throttled.as_secs_f64()); + for _ in 0..self.count { + self.global_metric.observe(ex_throttled.as_secs_f64()); + if let Some(timeline_metric) = self.timeline_metric { + timeline_metric.observe(ex_throttled.as_secs_f64()); + } } } } @@ -1343,6 +1346,14 @@ impl SmgrQueryTimePerTimeline { &'a self, op: SmgrQueryType, ctx: &'c RequestContext, + ) -> Option { + self.start_timer_many(op, 1, ctx) + } + pub(crate) fn start_timer_many<'c: 'a, 'a>( + &'a self, + op: SmgrQueryType, + count: usize, + ctx: &'c RequestContext, ) -> Option { let global_metric = &self.global_metrics[op as usize]; let start = Instant::now(); @@ -1376,6 +1387,7 @@ impl SmgrQueryTimePerTimeline { ctx, start, op, + count, }) } } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 9f33662a4b..99d0dc4250 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -10,9 +10,9 @@ use pageserver_api::models::{self, TenantState}; use pageserver_api::models::{ PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse, PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse, - PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse, - PagestreamGetSlruSegmentRequest, PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest, - PagestreamNblocksResponse, PagestreamProtocolVersion, + PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetSlruSegmentRequest, + PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest, PagestreamNblocksResponse, + PagestreamProtocolVersion, }; use pageserver_api::shard::TenantShardId; use postgres_backend::{is_expected_io_error, AuthType, PostgresBackend, QueryError}; @@ -577,18 +577,16 @@ impl PageServerHandler { } } - let mut batch = Vec::new(); - let mut num_consecutive_getpage_requests = 0; + let mut batched = None; 'outer: loop { enum DebouncedFeMessage { Exists(models::PagestreamExistsRequest), Nblocks(models::PagestreamNblocksRequest), GetPage { span: Span, - timeline: timeline::handle::Handle, - rel: RelTag, - blkno: BlockNumber, + shard: timeline::handle::Handle, effective_request_lsn: Lsn, + pages: smallvec::SmallVec<[(RelTag, BlockNumber); 1]>, }, DbSize(models::PagestreamDbSizeRequest), GetSlruSegment(models::PagestreamGetSlruSegmentRequest), @@ -597,7 +595,7 @@ impl PageServerHandler { let mut debounce: Option = None; // return or `?` on protocol error // `break EXPR` to stop batching. The EXPR will be the first message in the next batch. - let after_batch: Option = loop { + let next_batched: Option = loop { static BOUNCE_TIMEOUT: Lazy = Lazy::new(|| { utils::env::var::("NEON_PAGESERVER_DEBOUNCE") .unwrap() @@ -619,7 +617,7 @@ impl PageServerHandler { msg } _ = sleep_fut => { - assert!(!batch.is_empty()); + assert!(batched.is_some()); break None; } }; @@ -652,11 +650,12 @@ impl PageServerHandler { rel, blkno, }) => { - let span = tracing::info_span!("handle_get_page_at_lsn_request", %tenant_id, %timeline_id, shard_id = tracing::field::Empty, %rel, %blkno, req_lsn = %request_lsn); + let span = tracing::info_span!("handle_get_page_at_lsn_request_batched", %tenant_id, %timeline_id, shard_id = tracing::field::Empty, req_lsn = %request_lsn, batch_size = tracing::field::Empty); let key = rel_block_to_key(rel, blkno); - let timeline = match self + let shard = match self .timeline_handles .get(tenant_id, timeline_id, ShardSelector::Page(key)) + .instrument(span.clone()) .await { Ok(tl) => tl, @@ -680,15 +679,11 @@ impl PageServerHandler { } Err(e) => break Some(DebouncedFeMessage::RespondError(span, e.into())), }; - span.record( - "shard_id", - tracing::field::display(timeline.tenant_shard_id.shard_slug()), - ); let effective_request_lsn = match Self::wait_or_get_last_lsn( - &timeline, + &shard, request_lsn, not_modified_since, - &timeline.get_latest_gc_cutoff_lsn(), + &shard.get_latest_gc_cutoff_lsn(), &ctx, ) // TODO: if we actually need to wait for lsn here, it delays the entire batch which doesn't need to wait @@ -701,36 +696,38 @@ impl PageServerHandler { }; DebouncedFeMessage::GetPage { span, - timeline, - rel, - blkno, + shard, effective_request_lsn, + pages: smallvec::smallvec![(rel, blkno)], } } }; // check if we can debounce - match (batch.last(), this_msg) { + match (&mut batched, this_msg) { (None, this_msg) => { - batch.push(this_msg); + batched = Some(this_msg); } ( Some(DebouncedFeMessage::GetPage { span: _, - timeline: prev_shard, - rel: _, - blkno: _, - effective_request_lsn: prev_lsn, + shard: accum_shard, + pages: accum_pages, + effective_request_lsn: accum_lsn, }), DebouncedFeMessage::GetPage { - span, - timeline: this_shard, - rel, - blkno, + span: _, + shard: this_shard, + pages: this_pages, effective_request_lsn: this_lsn, }, ) if async { - if (prev_shard.tenant_shard_id, prev_shard.timeline_id) + assert_eq!(this_pages.len(), 1); + if accum_pages.len() >= Timeline::MAX_GET_VECTORED_KEYS as usize { + assert_eq!(accum_pages.len(), Timeline::MAX_GET_VECTORED_KEYS as usize); + return false; + } + if (accum_shard.tenant_shard_id, accum_shard.timeline_id) != (this_shard.tenant_shard_id, this_shard.timeline_id) { // TODO: we _could_ batch & execute each shard seperately (and in parallel). @@ -739,18 +736,12 @@ impl PageServerHandler { } // the vectored get currently only supports a single LSN, so, bounce as soon // as the effective request_lsn changes - return *prev_lsn == this_lsn; + return *accum_lsn == this_lsn; } .await => { // ok to batch - batch.push(DebouncedFeMessage::GetPage { - span, - timeline: this_shard, - rel, - blkno, - effective_request_lsn: this_lsn, - }); + accum_pages.extend(this_pages); } (Some(_), this_msg) => { // by default, don't continue batching @@ -764,107 +755,104 @@ impl PageServerHandler { break None; } }; - assert!(!batch.is_empty()); - CONSECUTIVE_NONBLOCKING_GETPAGE_REQUESTS_HISTOGRAM - .observe(num_consecutive_getpage_requests as f64); - num_consecutive_getpage_requests = 0; - - for msg in batch.drain(..) { - // invoke handler function - let (handler_result, span) = match msg { - DebouncedFeMessage::Exists(req) => { - CONSECUTIVE_NONBLOCKING_GETPAGE_REQUESTS_HISTOGRAM - .observe(num_consecutive_getpage_requests as f64); - num_consecutive_getpage_requests = 0; - - fail::fail_point!("ps::handle-pagerequest-message::exists"); - let span = tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.request_lsn); - ( + // invoke handler function + let (handler_results, span): ( + smallvec::SmallVec<[Result; 1]>, + _, + ) = match batched.take().expect("loop above ensures this") { + DebouncedFeMessage::Exists(req) => { + fail::fail_point!("ps::handle-pagerequest-message::exists"); + let span = tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.request_lsn); + ( + smallvec::smallvec![ self.handle_get_rel_exists_request(tenant_id, timeline_id, &req, &ctx) .instrument(span.clone()) - .await, - span, - ) - } - DebouncedFeMessage::Nblocks(req) => { - CONSECUTIVE_NONBLOCKING_GETPAGE_REQUESTS_HISTOGRAM - .observe(num_consecutive_getpage_requests as f64); - num_consecutive_getpage_requests = 0; - fail::fail_point!("ps::handle-pagerequest-message::nblocks"); - let span = tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.request_lsn); - ( + .await + ], + span, + ) + } + DebouncedFeMessage::Nblocks(req) => { + fail::fail_point!("ps::handle-pagerequest-message::nblocks"); + let span = tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.request_lsn); + ( + smallvec::smallvec![ self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx) .instrument(span.clone()) .await, - span, - ) - } - DebouncedFeMessage::GetPage { + ], span, - timeline, - rel, - blkno, - effective_request_lsn, - } => { - num_consecutive_getpage_requests += 1; - fail::fail_point!("ps::handle-pagerequest-message::getpage"); - // shard_id is filled in by the handler - ( - // TODO: issue vectored get - self.handle_get_page_at_lsn_request( - &timeline, - rel, - blkno, - effective_request_lsn, - &ctx, - ) - .instrument(span.clone()) - .await, - span, - ) - } - DebouncedFeMessage::DbSize(req) => { - CONSECUTIVE_NONBLOCKING_GETPAGE_REQUESTS_HISTOGRAM - .observe(num_consecutive_getpage_requests as f64); - num_consecutive_getpage_requests = 0; - fail::fail_point!("ps::handle-pagerequest-message::dbsize"); - let span = tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.request_lsn); - ( + ) + } + DebouncedFeMessage::GetPage { + span, + shard, + effective_request_lsn, + pages, + } => { + CONSECUTIVE_NONBLOCKING_GETPAGE_REQUESTS_HISTOGRAM.observe(pages.len() as f64); + span.record("batch_size", pages.len() as u64); + fail::fail_point!("ps::handle-pagerequest-message::getpage"); + // shard_id is filled in by the handler + ( + { + let npages = pages.len(); + let res = self + .handle_get_page_at_lsn_request_batched( + &shard, + effective_request_lsn, + pages, + &ctx, + ) + .instrument(span.clone()) + .await; + assert_eq!(res.len(), npages); + res + }, + span, + ) + } + DebouncedFeMessage::DbSize(req) => { + fail::fail_point!("ps::handle-pagerequest-message::dbsize"); + let span = tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.request_lsn); + ( + smallvec::smallvec![ self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx) .instrument(span.clone()) - .await, - span, - ) - } - DebouncedFeMessage::GetSlruSegment(req) => { - CONSECUTIVE_NONBLOCKING_GETPAGE_REQUESTS_HISTOGRAM - .observe(num_consecutive_getpage_requests as f64); - num_consecutive_getpage_requests = 0; - fail::fail_point!("ps::handle-pagerequest-message::slrusegment"); - let span = tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.request_lsn); - ( + .await + ], + span, + ) + } + DebouncedFeMessage::GetSlruSegment(req) => { + fail::fail_point!("ps::handle-pagerequest-message::slrusegment"); + let span = tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.request_lsn); + ( + smallvec::smallvec![ self.handle_get_slru_segment_request( tenant_id, timeline_id, &req, - &ctx, + &ctx ) .instrument(span.clone()) - .await, - span, - ) - } - DebouncedFeMessage::RespondError(span, e) => { - // We've already decided to respond with an error, so we don't need to - // call the handler. - (Err(e), span) - } - }; + .await + ], + span, + ) + } + DebouncedFeMessage::RespondError(span, e) => { + // We've already decided to respond with an error, so we don't need to + // call the handler. + (smallvec::smallvec![Err(e)], span) + } + }; - // Map handler result to protocol behavior. - // Some handler errors cause exit from pagestream protocol. - // Other handler errors are sent back as an error message and we stay in pagestream protocol. + // Map handler result to protocol behavior. + // Some handler errors cause exit from pagestream protocol. + // Other handler errors are sent back as an error message and we stay in pagestream protocol. + for handler_result in handler_results { let response_msg = match handler_result { Err(e) => match &e { PageStreamError::Shutdown => { @@ -899,21 +887,21 @@ impl PageServerHandler { // marshal & transmit response message pgb.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?; - tokio::select! { - biased; - _ = self.cancel.cancelled() => { - // We were requested to shut down. - info!("shutdown request received in page handler"); - return Err(QueryError::Shutdown) - } - res = pgb.flush() => { - res?; - } + } + tokio::select! { + biased; + _ = self.cancel.cancelled() => { + // We were requested to shut down. + info!("shutdown request received in page handler"); + return Err(QueryError::Shutdown) + } + res = pgb.flush() => { + res?; } } - assert!(batch.is_empty()); - batch.extend(after_batch.into_iter()); + assert!(batched.is_none(), "we take() earlier"); + batched = next_batched; } Ok(()) } @@ -1158,25 +1146,29 @@ impl PageServerHandler { } #[instrument(skip_all)] - async fn handle_get_page_at_lsn_request( + async fn handle_get_page_at_lsn_request_batched( &mut self, timeline: &Timeline, - rel: RelTag, - blkno: BlockNumber, effective_lsn: Lsn, + pages: smallvec::SmallVec<[(RelTag, BlockNumber); 1]>, ctx: &RequestContext, - ) -> Result { + ) -> smallvec::SmallVec<[Result; 1]> { debug_assert_current_span_has_tenant_and_timeline_id(); - let _timer = timeline - .query_metrics - .start_timer(metrics::SmgrQueryType::GetPageAtLsn, ctx); + let _timer = timeline.query_metrics.start_timer_many( + metrics::SmgrQueryType::GetPageAtLsn, + pages.len(), + ctx, + ); - let page = timeline - .get_rel_page_at_lsn(rel, blkno, Version::Lsn(effective_lsn), ctx) - .await?; + let pages = timeline + .get_rel_page_at_lsn_batched(pages, Version::Lsn(effective_lsn), ctx) + .await; - Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse { - page, + smallvec::SmallVec::from_iter(pages.into_iter().map(|page| { + page.map(|page| { + PagestreamBeMessage::GetPage(models::PagestreamGetPageResponse { page }) + }) + .map_err(PageStreamError::Read) })) } diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 808d4b666e..7cf7c7059c 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -9,12 +9,17 @@ use super::tenant::{PageReconstructError, Timeline}; use crate::context::RequestContext; use crate::keyspace::{KeySpace, KeySpaceAccum}; -use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id; +use crate::span::{ + debug_assert_current_span_has_tenant_and_timeline_id, + debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id, +}; +use crate::tenant::timeline::GetVectoredError; use crate::walrecord::NeonWalRecord; use crate::{aux_file, repository::*}; use anyhow::{ensure, Context}; use bytes::{Buf, Bytes, BytesMut}; use enum_map::Enum; +use itertools::Itertools; use pageserver_api::key::{ dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range, rel_size_to_key, relmap_file_key, repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key, @@ -28,7 +33,7 @@ use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM}; use postgres_ffi::BLCKSZ; use postgres_ffi::{Oid, RepOriginId, TimestampTz, TransactionId}; use serde::{Deserialize, Serialize}; -use std::collections::{hash_map, HashMap, HashSet}; +use std::collections::{hash_map, BTreeMap, HashMap, HashSet}; use std::ops::ControlFlow; use std::ops::Range; use strum::IntoEnumIterator; @@ -191,26 +196,184 @@ impl Timeline { version: Version<'_>, ctx: &RequestContext, ) -> Result { - if tag.relnode == 0 { - return Err(PageReconstructError::Other( - RelationError::InvalidRelnode.into(), - )); - } + let pages = smallvec::smallvec![(tag, blknum)]; + let res = self.get_rel_page_at_lsn_batched(pages, version, ctx).await; + assert_eq!(res.len(), 1); + res.into_iter().next().unwrap() + } - let nblocks = self.get_rel_size(tag, version, ctx).await?; - if blknum >= nblocks { - debug!( - "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page", - tag, - blknum, - version.get_lsn(), - nblocks - ); - return Ok(ZERO_PAGE.clone()); + /// Like [`get_rel_page_at_lsn`], but returns a batch of pages. + pub(crate) async fn get_rel_page_at_lsn_batched( + &self, + pages: smallvec::SmallVec<[(RelTag, BlockNumber); 1]>, + version: Version<'_>, + ctx: &RequestContext, + ) -> smallvec::SmallVec<[Result; 1]> { + debug_assert_current_span_has_tenant_and_timeline_id(); + let request_lsn = match version { + Version::Lsn(lsn) => lsn, + Version::Modified(_) => panic!("unsupported"), + }; + enum KeyState { + NeedsVectoredGet, + Done(Result), } + let mut key_states = BTreeMap::new(); + let mut vectored_gets: smallvec::SmallVec<[_; 1]> = + smallvec::SmallVec::with_capacity(pages.len()); + for (response_order, (tag, blknum)) in pages.into_iter().enumerate() { + let key = rel_block_to_key(tag, blknum); + use std::collections::btree_map::Entry; + let key_state_slot = match key_states.entry((key, response_order)) { + Entry::Occupied(_entry) => unreachable!( + "enumerate makes keys unique, even if batch contains same key twice" + ), + Entry::Vacant(entry) => entry, + }; - let key = rel_block_to_key(tag, blknum); - version.get(self, key, ctx).await + if tag.relnode == 0 { + key_state_slot.insert(KeyState::Done(Err(PageReconstructError::Other( + RelationError::InvalidRelnode.into(), + )))); + continue; + } + + let nblocks = match self.get_rel_size(tag, version, ctx).await { + Ok(nblocks) => nblocks, + Err(err) => { + key_state_slot.insert(KeyState::Done(Err(err))); + continue; + } + }; + if blknum >= nblocks { + debug!( + "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page", + tag, + blknum, + version.get_lsn(), + nblocks + ); + key_state_slot.insert(KeyState::Done(Ok(ZERO_PAGE.clone()))); + continue; + } + + vectored_gets.push(key); + key_state_slot.insert(KeyState::NeedsVectoredGet); + } + // turn vectored_gets into a keyspace + let keyspace = { + // add_key reuqires monotonicity + vectored_gets.sort_unstable(); + let mut acc = KeySpaceAccum::new(); + for key in vectored_gets + .into_iter() + // in fact it requires strong monotonicity + .dedup() + { + acc.add_key(key); + } + acc.to_keyspace() + }; + + match self.get_vectored(keyspace, request_lsn, ctx).await { + Ok(results) => { + for (key, res) in results { + if let Err(err) = &res { + warn!(%key, ?err, "a key inside get_vectored failed with a per-key error"); + } + let mut interests = key_states.range_mut((key, 0)..(key.next(), 0)).peekable(); + let first_interest = interests.next().unwrap(); + let next_interest = interests.peek().is_some(); + if !next_interest { + match first_interest.1 { + KeyState::NeedsVectoredGet => { + *first_interest.1 = KeyState::Done(res); + } + KeyState::Done(_) => unreachable!(), + } + continue; + } else { + for ((_, _), state) in [first_interest].into_iter().chain(interests) { + match state { + KeyState::NeedsVectoredGet => { + *state = KeyState::Done(match &res { + Ok(buf) => Ok(buf.clone()), + // this `match` is working around the fact that we cannot Clone the PageReconstructError + Err(err) => Err(match err { + PageReconstructError::Cancelled => { + PageReconstructError::Cancelled + } + + x @ PageReconstructError::Other(_) | + x @ PageReconstructError::AncestorLsnTimeout(_) | + x @ PageReconstructError::WalRedo(_) | + x @ PageReconstructError::MissingKey(_) => { + PageReconstructError::Other(anyhow::anyhow!("there was more than one request for this key in the batch, error logged once: {x:?}")) + }, + }), + }); + } + KeyState::Done(_) => unreachable!(), + } + } + } + } + } + Err(err) => { + warn!(?err, "get_vectored failed with a global error, mapping that error to per-key failure"); + // this cannot really happen because get_vectored only errors globally on invalid LSN or too large batch size + for ((_, _), state) in key_states.iter_mut() { + // this whole `match` is a lot like `From for PageReconstructError` + // but without taking ownership of the GetVectoredError + match &err { + GetVectoredError::Cancelled => { + *state = KeyState::Done(Err(PageReconstructError::Cancelled)); + } + // TODO: restructure get_vectored API to make this error per-key + GetVectoredError::MissingKey(err) => { + *state = KeyState::Done(Err(PageReconstructError::Other(anyhow::anyhow!("whole vectored get request failed because one or more of the requested keys were missing: {err:?}")))); + } + // TODO: restructure get_vectored API to make this error per-key + GetVectoredError::GetReadyAncestorError(err) => { + *state = KeyState::Done(Err(PageReconstructError::Other(anyhow::anyhow!("whole vectored get request failed because one or more key required ancestor that wasn't ready: {err:?}")))); + } + // TODO: restructure get_vectored API to make this error per-key + GetVectoredError::Other(err) => { + *state = KeyState::Done(Err(PageReconstructError::Other( + anyhow::anyhow!("whole vectored get request failed: {err:?}"), + ))); + } + // TODO: we can prevent this error class by moving this check into the type system + GetVectoredError::InvalidLsn(e) => { + *state = + KeyState::Done(Err(anyhow::anyhow!("invalid LSN: {e:?}").into())); + } + // NB: this should never happen in practice because we limit MAX_GET_VECTORED_KEYS + // TODO: we can prevent this error class by moving this check into the type system + GetVectoredError::Oversized(err) => { + *state = KeyState::Done(Err(anyhow::anyhow!( + "batching oversized: {err:?}" + ) + .into())); + } + } + } + } + }; + + // get the results into the order in which they were requested + let mut return_order: smallvec::SmallVec<[_; Timeline::MAX_GET_VECTORED_KEYS as usize]> = + smallvec::SmallVec::with_capacity(key_states.len()); + return_order.extend(key_states.keys().map(|(key, idx)| (*key, *idx))); + return_order.sort_unstable_by_key(|(_, idx)| *idx); + let mut res = smallvec::SmallVec::with_capacity(key_states.len()); + res.extend(return_order.into_iter().map(|key_states_key| { + match key_states.remove(&key_states_key).unwrap() { + KeyState::Done(res) => res, + KeyState::NeedsVectoredGet => unreachable!(), + } + })); + res } // Get size of a database in blocks