From 2d7432231f13f14200d2e592a8231aca2e9d6880 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Mon, 4 Mar 2024 15:36:47 +0000 Subject: [PATCH] GetVectoredPages --- libs/pageserver_api/src/models.rs | 61 +++++++++++++ pageserver/client/src/page_service.rs | 37 +++++++- pageserver/src/page_service.rs | 99 ++++++++++++++++++++ pageserver/src/pgdatadir_mapping.rs | 126 +++++++++++++++++++++++++- trace/src/main.rs | 1 + 5 files changed, 320 insertions(+), 4 deletions(-) diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index d583866290..8ed27409e1 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -738,6 +738,7 @@ pub enum PagestreamFeMessage { GetPage(PagestreamGetPageRequest), DbSize(PagestreamDbSizeRequest), GetSlruSegment(PagestreamGetSlruSegmentRequest), + GetVectoredPages(PagestreamGetVectoredPagesRequest), } // Wrapped in libpq CopyData @@ -749,6 +750,7 @@ pub enum PagestreamBeMessage { Error(PagestreamErrorResponse), DbSize(PagestreamDbSizeResponse), GetSlruSegment(PagestreamGetSlruSegmentResponse), + GetVectoredPages(PagestreamGetVectoredPagesResponse), } // Keep in sync with `pagestore_client.h` @@ -760,6 +762,7 @@ enum PagestreamBeMessageTag { Error = 103, DbSize = 104, GetSlruSegment = 105, + GetVectoredPages = 106, } impl TryFrom for PagestreamBeMessageTag { type Error = u8; @@ -771,6 +774,7 @@ impl TryFrom for PagestreamBeMessageTag { 103 => Ok(PagestreamBeMessageTag::Error), 104 => Ok(PagestreamBeMessageTag::DbSize), 105 => Ok(PagestreamBeMessageTag::GetSlruSegment), + 106 => Ok(PagestreamBeMessageTag::GetVectoredPages), _ => Err(value), } } @@ -813,6 +817,15 @@ pub struct PagestreamGetSlruSegmentRequest { pub segno: u32, } +#[derive(Debug, PartialEq, Eq)] +pub struct PagestreamGetVectoredPagesRequest { + pub latest: bool, + pub lsn: Lsn, + pub rel: RelTag, + pub blkno: u32, + pub count: u8, +} + #[derive(Debug)] pub struct PagestreamExistsResponse { pub exists: bool, @@ -833,6 +846,12 @@ pub struct PagestreamGetSlruSegmentResponse { pub segment: Bytes, } +#[derive(Debug)] +pub struct PagestreamGetVectoredPagesResponse { + pub page_count: u8, + pub pages: Bytes, +} + #[derive(Debug)] pub struct PagestreamErrorResponse { pub message: String, @@ -904,6 +923,18 @@ impl PagestreamFeMessage { bytes.put_u8(req.kind); bytes.put_u32(req.segno); } + + Self::GetVectoredPages(req) => { + bytes.put_u8(5); + bytes.put_u8(u8::from(req.latest)); + bytes.put_u64(req.lsn.0); + bytes.put_u32(req.rel.spcnode); + bytes.put_u32(req.rel.dbnode); + bytes.put_u32(req.rel.relnode); + bytes.put_u8(req.rel.forknum); + bytes.put_u32(req.blkno); + bytes.put_u8(req.count); + } } bytes.into() @@ -962,6 +993,20 @@ impl PagestreamFeMessage { segno: body.read_u32::()?, }, )), + 5 => Ok(PagestreamFeMessage::GetVectoredPages( + PagestreamGetVectoredPagesRequest { + latest: body.read_u8()? != 0, + lsn: Lsn::from(body.read_u64::()?), + rel: RelTag { + spcnode: body.read_u32::()?, + dbnode: body.read_u32::()?, + relnode: body.read_u32::()?, + forknum: body.read_u8()?, + }, + blkno: body.read_u32::()?, + count: body.read_u8()?, + }, + )), _ => bail!("unknown smgr message tag: {:?}", msg_tag), } } @@ -1003,6 +1048,12 @@ impl PagestreamBeMessage { bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32); bytes.put(&resp.segment[..]); } + + Self::GetVectoredPages(resp) => { + bytes.put_u8(Tag::GetVectoredPages as u8); + bytes.put_u8(resp.page_count); + bytes.put(&resp.pages[..]); + } } bytes.into() @@ -1051,6 +1102,15 @@ impl PagestreamBeMessage { segment: segment.into(), }) } + Tag::GetVectoredPages => { + let page_count = buf.read_u8()?; + let mut pages = vec![0; page_count as usize * 8192]; + buf.read_exact(&mut pages)?; + Self::GetVectoredPages(PagestreamGetVectoredPagesResponse { + page_count, + pages: pages.into(), + }) + } }; let remaining = buf.into_inner(); if !remaining.is_empty() { @@ -1070,6 +1130,7 @@ impl PagestreamBeMessage { Self::Error(_) => "Error", Self::DbSize(_) => "DbSize", Self::GetSlruSegment(_) => "GetSlruSegment", + Self::GetVectoredPages(_) => "GetVectoredPages", } } } diff --git a/pageserver/client/src/page_service.rs b/pageserver/client/src/page_service.rs index 49175b3b90..0d7f9a1440 100644 --- a/pageserver/client/src/page_service.rs +++ b/pageserver/client/src/page_service.rs @@ -4,7 +4,8 @@ use futures::SinkExt; use pageserver_api::{ models::{ PagestreamBeMessage, PagestreamFeMessage, PagestreamGetPageRequest, - PagestreamGetPageResponse, + PagestreamGetPageResponse, PagestreamGetVectoredPagesRequest, + PagestreamGetVectoredPagesResponse, }, reltag::RelTag, }; @@ -157,7 +158,39 @@ impl PagestreamClient { PagestreamBeMessage::Exists(_) | PagestreamBeMessage::Nblocks(_) | PagestreamBeMessage::DbSize(_) - | PagestreamBeMessage::GetSlruSegment(_) => { + | PagestreamBeMessage::GetSlruSegment(_) + | PagestreamBeMessage::GetVectoredPages(_) => { + anyhow::bail!( + "unexpected be message kind in response to getpage request: {}", + msg.kind() + ) + } + } + } + + pub async fn getpages( + &mut self, + req: PagestreamGetVectoredPagesRequest, + ) -> anyhow::Result { + let req = PagestreamFeMessage::GetVectoredPages(req); + let req: bytes::Bytes = req.serialize(); + // let mut req = tokio_util::io::ReaderStream::new(&req); + let mut req = tokio_stream::once(Ok(req)); + + self.copy_both.send_all(&mut req).await?; + + let next: Option> = self.copy_both.next().await; + let next: bytes::Bytes = next.unwrap()?; + + let msg = PagestreamBeMessage::deserialize(next)?; + match msg { + PagestreamBeMessage::GetVectoredPages(p) => Ok(p), + PagestreamBeMessage::Error(e) => anyhow::bail!("Error: {:?}", e), + PagestreamBeMessage::Exists(_) + | PagestreamBeMessage::Nblocks(_) + | PagestreamBeMessage::DbSize(_) + | PagestreamBeMessage::GetSlruSegment(_) + | PagestreamBeMessage::GetPage(_) => { anyhow::bail!( "unexpected be message kind in response to getpage request: {}", msg.kind() diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 689bc5cb3c..6c383ee1c3 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -17,6 +17,8 @@ use futures::stream::FuturesUnordered; use futures::Stream; use futures::StreamExt; use pageserver_api::key::Key; +use pageserver_api::models::PagestreamGetVectoredPagesRequest; +use pageserver_api::models::PagestreamGetVectoredPagesResponse; use pageserver_api::models::TenantState; use pageserver_api::models::{ PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse, @@ -70,6 +72,7 @@ use crate::tenant::mgr; use crate::tenant::mgr::get_active_tenant_with_timeout; use crate::tenant::mgr::GetActiveTenantError; use crate::tenant::mgr::ShardSelector; +use crate::tenant::timeline::GetVectoredError; use crate::tenant::timeline::WaitLsnError; use crate::tenant::GetTimelineError; use crate::tenant::PageReconstructError; @@ -333,6 +336,10 @@ enum PageStreamError { #[error("Read error")] Read(#[source] PageReconstructError), + /// Something went wrong reading a page: this likely indicates a pageserver bug + #[error("Vectored read error")] + VectoredRead(#[source] GetVectoredError), + /// Ran out of time waiting for an LSN #[error("LSN timeout: {0}")] LsnTimeout(WaitLsnError), @@ -356,6 +363,15 @@ impl From for PageStreamError { } } +impl From for PageStreamError { + fn from(value: GetVectoredError) -> Self { + match value { + GetVectoredError::Cancelled => Self::Shutdown, + e => Self::VectoredRead(e), + } + } +} + impl From for PageStreamError { fn from(value: GetActiveTimelineError) -> Self { match value { @@ -665,6 +681,15 @@ impl PageServerHandler { span, ) } + PagestreamFeMessage::GetVectoredPages(req) => { + let span = tracing::info_span!("handle_get_vectored_pages_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn, req_count = %req.count); + ( + self.handle_get_pages_at_lsn_request(tenant_id, timeline_id, &req, &ctx) + .instrument(span.clone()) + .await, + span, + ) + } }; match response { @@ -1160,6 +1185,80 @@ impl PageServerHandler { })) } + #[instrument(skip_all, fields(shard_id))] + async fn handle_get_pages_at_lsn_request( + &mut self, + tenant_id: TenantId, + timeline_id: TimelineId, + req: &PagestreamGetVectoredPagesRequest, + ctx: &RequestContext, + ) -> Result { + // This is cheeky and relies on not using sharding :) + // A real solution has to split the requested key sequence between shards. + let get_page_request = PagestreamGetPageRequest { + latest: req.latest, + lsn: req.lsn, + rel: req.rel, + blkno: req.blkno, + }; + + let timeline = match self.get_cached_timeline_for_page(&get_page_request) { + Ok(tl) => tl, + Err(key) => { + match self + .load_timeline_for_page(tenant_id, timeline_id, key) + .await + { + Ok(t) => t, + Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => { + // We already know this tenant exists in general, because we resolved it at + // start of connection. Getting a NotFound here indicates that the shard containing + // the requested page is not present on this node: the client's knowledge of shard->pageserver + // mapping is out of date. + // + // Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via + // client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration + // and talk to a different pageserver. + return Err(PageStreamError::Reconnect( + "getpage@lsn request routed to wrong shard".into(), + )); + } + Err(e) => return Err(e.into()), + } + } + }; + + // load_timeline_for_page sets shard_id, but get_cached_timeline_for_page doesn't + set_tracing_field_shard_id(timeline); + + let _timer = timeline + .query_metrics + .start_timer(metrics::SmgrQueryType::GetPageAtLsn); + + let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); + let lsn = + Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx) + .await?; + + let (page_count, pages_buf) = timeline + .get_rel_pages_at_lsn( + req.rel, + req.blkno, + req.count, + Version::Lsn(lsn), + req.latest, + ctx, + ) + .await?; + + Ok(PagestreamBeMessage::GetVectoredPages( + PagestreamGetVectoredPagesResponse { + page_count, + pages: pages_buf, + }, + )) + } + #[instrument(skip_all, fields(shard_id))] async fn handle_get_slru_segment_request( &mut self, diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 7be08f86b1..ea06f8d7ec 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -11,8 +11,9 @@ use crate::context::RequestContext; use crate::keyspace::{KeySpace, KeySpaceAccum}; use crate::repository::*; use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id; +use crate::tenant::timeline::GetVectoredError; use crate::walrecord::NeonWalRecord; -use anyhow::{ensure, Context}; +use anyhow::{anyhow, ensure, Context}; use bytes::{Buf, Bytes, BytesMut}; use enum_map::Enum; use pageserver_api::key::{ @@ -26,7 +27,7 @@ use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM}; use postgres_ffi::BLCKSZ; use postgres_ffi::{Oid, TimestampTz, TransactionId}; use serde::{Deserialize, Serialize}; -use std::collections::{hash_map, HashMap, HashSet}; +use std::collections::{hash_map, BTreeMap, HashMap, HashSet}; use std::ops::ControlFlow; use std::ops::Range; use strum::IntoEnumIterator; @@ -198,6 +199,41 @@ impl Timeline { version.get(self, key, ctx).await } + pub(crate) async fn get_rel_pages_at_lsn( + &self, + tag: RelTag, + blknum: BlockNumber, + count: u8, + version: Version<'_>, + latest: bool, + ctx: &RequestContext, + ) -> Result<(u8, Bytes), GetVectoredError> { + if tag.relnode == 0 { + return Err(GetVectoredError::Other( + RelationError::InvalidRelnode.into(), + )); + } + + let nblocks = self + .get_rel_size(tag, version, latest, ctx) + .await + .map_err(|e| GetVectoredError::Other(anyhow!(e)))?; + if blknum + (count - 1) as u32 >= nblocks { + debug!( + "read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page", + tag, + blknum, + version.get_lsn(), + nblocks + ); + return Ok((1, ZERO_PAGE.clone())); + } + + let start_key = rel_block_to_key(tag, blknum); + let end_key = start_key.add(count as u32); + version.get_vectored(self, start_key..end_key, ctx).await + } + // Get size of a database in blocks pub(crate) async fn get_db_size( &self, @@ -1604,6 +1640,55 @@ impl<'a> DatadirModification<'a> { self.tline.get(key, lsn, ctx).await } + async fn get_vectored( + &self, + key_range: Range, + ctx: &RequestContext, + ) -> Result>, GetVectoredError> { + // Have we already updated the same key? Read the latest pending updated + // version in that case. + // + // Note: we don't check pending_deletions. It is an error to request a + // value that has been removed, deletion only avoids leaking storage. + let mut results: BTreeMap> = BTreeMap::new(); + let mut keys_in_modification = KeySpaceAccum::new(); + + let key = key_range.start; + while key != key_range.end { + if let Some(values) = self.pending_updates.get(&key) { + if let Some((_, value)) = values.last() { + keys_in_modification.add_key(key); + + match value { + Value::Image(img) => { + results.insert(key, Ok(img.clone())); + } + _ => { + results.insert( + key, + Err(PageReconstructError::from(anyhow::anyhow!( + "unexpected pending WAL record" + ))), + ); + } + } + } + } + } + + let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn); + + let mut keyspace = KeySpace { + ranges: vec![key_range], + }; + keyspace.remove_overlapping_with(&keys_in_modification.to_keyspace()); + + let pages = self.tline.get_vectored(keyspace, lsn, ctx).await?; + results.extend(pages.into_iter()); + + Ok(results) + } + fn put(&mut self, key: Key, val: Value) { let values = self.pending_updates.entry(key).or_default(); // Replace the previous value if it exists at the same lsn @@ -1647,6 +1732,43 @@ impl<'a> Version<'a> { } } + async fn get_vectored( + &self, + timeline: &Timeline, + key_range: Range, + ctx: &RequestContext, + ) -> Result<(u8, Bytes), GetVectoredError> { + let pages = match self { + Version::Lsn(lsn) => { + timeline + .get_vectored( + KeySpace { + ranges: vec![key_range], + }, + *lsn, + ctx, + ) + .await + } + Version::Modified(modification) => modification.get_vectored(key_range, ctx).await, + }?; + + let mut buf = BytesMut::new(); + let page_count: u8 = pages.len().try_into().expect("too many pages returned"); + for page in pages { + match page { + (_key, Ok(bytes)) => { + buf.extend_from_slice(&bytes[..]); + } + (_key, Err(err)) => { + return Err(GetVectoredError::Other(anyhow!(err))); + } + } + } + + Ok((page_count, buf.freeze())) + } + fn get_lsn(&self) -> Lsn { match self { Version::Lsn(lsn) => *lsn, diff --git a/trace/src/main.rs b/trace/src/main.rs index 4605c124e9..b9acbda4c4 100644 --- a/trace/src/main.rs +++ b/trace/src/main.rs @@ -74,6 +74,7 @@ fn analyze_trace(mut reader: R) { } prev = Some(req); } + PagestreamFeMessage::GetVectoredPages(_) => {} PagestreamFeMessage::DbSize(_) => {} }; }