mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-19 22:20:37 +00:00
GetVectoredPages
This commit is contained in:
@@ -738,6 +738,7 @@ pub enum PagestreamFeMessage {
|
||||
GetPage(PagestreamGetPageRequest),
|
||||
DbSize(PagestreamDbSizeRequest),
|
||||
GetSlruSegment(PagestreamGetSlruSegmentRequest),
|
||||
GetVectoredPages(PagestreamGetVectoredPagesRequest),
|
||||
}
|
||||
|
||||
// Wrapped in libpq CopyData
|
||||
@@ -749,6 +750,7 @@ pub enum PagestreamBeMessage {
|
||||
Error(PagestreamErrorResponse),
|
||||
DbSize(PagestreamDbSizeResponse),
|
||||
GetSlruSegment(PagestreamGetSlruSegmentResponse),
|
||||
GetVectoredPages(PagestreamGetVectoredPagesResponse),
|
||||
}
|
||||
|
||||
// Keep in sync with `pagestore_client.h`
|
||||
@@ -760,6 +762,7 @@ enum PagestreamBeMessageTag {
|
||||
Error = 103,
|
||||
DbSize = 104,
|
||||
GetSlruSegment = 105,
|
||||
GetVectoredPages = 106,
|
||||
}
|
||||
impl TryFrom<u8> for PagestreamBeMessageTag {
|
||||
type Error = u8;
|
||||
@@ -771,6 +774,7 @@ impl TryFrom<u8> for PagestreamBeMessageTag {
|
||||
103 => Ok(PagestreamBeMessageTag::Error),
|
||||
104 => Ok(PagestreamBeMessageTag::DbSize),
|
||||
105 => Ok(PagestreamBeMessageTag::GetSlruSegment),
|
||||
106 => Ok(PagestreamBeMessageTag::GetVectoredPages),
|
||||
_ => Err(value),
|
||||
}
|
||||
}
|
||||
@@ -813,6 +817,15 @@ pub struct PagestreamGetSlruSegmentRequest {
|
||||
pub segno: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub struct PagestreamGetVectoredPagesRequest {
|
||||
pub latest: bool,
|
||||
pub lsn: Lsn,
|
||||
pub rel: RelTag,
|
||||
pub blkno: u32,
|
||||
pub count: u8,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PagestreamExistsResponse {
|
||||
pub exists: bool,
|
||||
@@ -833,6 +846,12 @@ pub struct PagestreamGetSlruSegmentResponse {
|
||||
pub segment: Bytes,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PagestreamGetVectoredPagesResponse {
|
||||
pub page_count: u8,
|
||||
pub pages: Bytes,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PagestreamErrorResponse {
|
||||
pub message: String,
|
||||
@@ -904,6 +923,18 @@ impl PagestreamFeMessage {
|
||||
bytes.put_u8(req.kind);
|
||||
bytes.put_u32(req.segno);
|
||||
}
|
||||
|
||||
Self::GetVectoredPages(req) => {
|
||||
bytes.put_u8(5);
|
||||
bytes.put_u8(u8::from(req.latest));
|
||||
bytes.put_u64(req.lsn.0);
|
||||
bytes.put_u32(req.rel.spcnode);
|
||||
bytes.put_u32(req.rel.dbnode);
|
||||
bytes.put_u32(req.rel.relnode);
|
||||
bytes.put_u8(req.rel.forknum);
|
||||
bytes.put_u32(req.blkno);
|
||||
bytes.put_u8(req.count);
|
||||
}
|
||||
}
|
||||
|
||||
bytes.into()
|
||||
@@ -962,6 +993,20 @@ impl PagestreamFeMessage {
|
||||
segno: body.read_u32::<BigEndian>()?,
|
||||
},
|
||||
)),
|
||||
5 => Ok(PagestreamFeMessage::GetVectoredPages(
|
||||
PagestreamGetVectoredPagesRequest {
|
||||
latest: body.read_u8()? != 0,
|
||||
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
|
||||
rel: RelTag {
|
||||
spcnode: body.read_u32::<BigEndian>()?,
|
||||
dbnode: body.read_u32::<BigEndian>()?,
|
||||
relnode: body.read_u32::<BigEndian>()?,
|
||||
forknum: body.read_u8()?,
|
||||
},
|
||||
blkno: body.read_u32::<BigEndian>()?,
|
||||
count: body.read_u8()?,
|
||||
},
|
||||
)),
|
||||
_ => bail!("unknown smgr message tag: {:?}", msg_tag),
|
||||
}
|
||||
}
|
||||
@@ -1003,6 +1048,12 @@ impl PagestreamBeMessage {
|
||||
bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32);
|
||||
bytes.put(&resp.segment[..]);
|
||||
}
|
||||
|
||||
Self::GetVectoredPages(resp) => {
|
||||
bytes.put_u8(Tag::GetVectoredPages as u8);
|
||||
bytes.put_u8(resp.page_count);
|
||||
bytes.put(&resp.pages[..]);
|
||||
}
|
||||
}
|
||||
|
||||
bytes.into()
|
||||
@@ -1051,6 +1102,15 @@ impl PagestreamBeMessage {
|
||||
segment: segment.into(),
|
||||
})
|
||||
}
|
||||
Tag::GetVectoredPages => {
|
||||
let page_count = buf.read_u8()?;
|
||||
let mut pages = vec![0; page_count as usize * 8192];
|
||||
buf.read_exact(&mut pages)?;
|
||||
Self::GetVectoredPages(PagestreamGetVectoredPagesResponse {
|
||||
page_count,
|
||||
pages: pages.into(),
|
||||
})
|
||||
}
|
||||
};
|
||||
let remaining = buf.into_inner();
|
||||
if !remaining.is_empty() {
|
||||
@@ -1070,6 +1130,7 @@ impl PagestreamBeMessage {
|
||||
Self::Error(_) => "Error",
|
||||
Self::DbSize(_) => "DbSize",
|
||||
Self::GetSlruSegment(_) => "GetSlruSegment",
|
||||
Self::GetVectoredPages(_) => "GetVectoredPages",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,7 +4,8 @@ use futures::SinkExt;
|
||||
use pageserver_api::{
|
||||
models::{
|
||||
PagestreamBeMessage, PagestreamFeMessage, PagestreamGetPageRequest,
|
||||
PagestreamGetPageResponse,
|
||||
PagestreamGetPageResponse, PagestreamGetVectoredPagesRequest,
|
||||
PagestreamGetVectoredPagesResponse,
|
||||
},
|
||||
reltag::RelTag,
|
||||
};
|
||||
@@ -157,7 +158,39 @@ impl PagestreamClient {
|
||||
PagestreamBeMessage::Exists(_)
|
||||
| PagestreamBeMessage::Nblocks(_)
|
||||
| PagestreamBeMessage::DbSize(_)
|
||||
| PagestreamBeMessage::GetSlruSegment(_) => {
|
||||
| PagestreamBeMessage::GetSlruSegment(_)
|
||||
| PagestreamBeMessage::GetVectoredPages(_) => {
|
||||
anyhow::bail!(
|
||||
"unexpected be message kind in response to getpage request: {}",
|
||||
msg.kind()
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn getpages(
|
||||
&mut self,
|
||||
req: PagestreamGetVectoredPagesRequest,
|
||||
) -> anyhow::Result<PagestreamGetVectoredPagesResponse> {
|
||||
let req = PagestreamFeMessage::GetVectoredPages(req);
|
||||
let req: bytes::Bytes = req.serialize();
|
||||
// let mut req = tokio_util::io::ReaderStream::new(&req);
|
||||
let mut req = tokio_stream::once(Ok(req));
|
||||
|
||||
self.copy_both.send_all(&mut req).await?;
|
||||
|
||||
let next: Option<Result<bytes::Bytes, _>> = self.copy_both.next().await;
|
||||
let next: bytes::Bytes = next.unwrap()?;
|
||||
|
||||
let msg = PagestreamBeMessage::deserialize(next)?;
|
||||
match msg {
|
||||
PagestreamBeMessage::GetVectoredPages(p) => Ok(p),
|
||||
PagestreamBeMessage::Error(e) => anyhow::bail!("Error: {:?}", e),
|
||||
PagestreamBeMessage::Exists(_)
|
||||
| PagestreamBeMessage::Nblocks(_)
|
||||
| PagestreamBeMessage::DbSize(_)
|
||||
| PagestreamBeMessage::GetSlruSegment(_)
|
||||
| PagestreamBeMessage::GetPage(_) => {
|
||||
anyhow::bail!(
|
||||
"unexpected be message kind in response to getpage request: {}",
|
||||
msg.kind()
|
||||
|
||||
@@ -17,6 +17,8 @@ use futures::stream::FuturesUnordered;
|
||||
use futures::Stream;
|
||||
use futures::StreamExt;
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::models::PagestreamGetVectoredPagesRequest;
|
||||
use pageserver_api::models::PagestreamGetVectoredPagesResponse;
|
||||
use pageserver_api::models::TenantState;
|
||||
use pageserver_api::models::{
|
||||
PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
|
||||
@@ -70,6 +72,7 @@ use crate::tenant::mgr;
|
||||
use crate::tenant::mgr::get_active_tenant_with_timeout;
|
||||
use crate::tenant::mgr::GetActiveTenantError;
|
||||
use crate::tenant::mgr::ShardSelector;
|
||||
use crate::tenant::timeline::GetVectoredError;
|
||||
use crate::tenant::timeline::WaitLsnError;
|
||||
use crate::tenant::GetTimelineError;
|
||||
use crate::tenant::PageReconstructError;
|
||||
@@ -333,6 +336,10 @@ enum PageStreamError {
|
||||
#[error("Read error")]
|
||||
Read(#[source] PageReconstructError),
|
||||
|
||||
/// Something went wrong reading a page: this likely indicates a pageserver bug
|
||||
#[error("Vectored read error")]
|
||||
VectoredRead(#[source] GetVectoredError),
|
||||
|
||||
/// Ran out of time waiting for an LSN
|
||||
#[error("LSN timeout: {0}")]
|
||||
LsnTimeout(WaitLsnError),
|
||||
@@ -356,6 +363,15 @@ impl From<PageReconstructError> for PageStreamError {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<GetVectoredError> for PageStreamError {
|
||||
fn from(value: GetVectoredError) -> Self {
|
||||
match value {
|
||||
GetVectoredError::Cancelled => Self::Shutdown,
|
||||
e => Self::VectoredRead(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<GetActiveTimelineError> for PageStreamError {
|
||||
fn from(value: GetActiveTimelineError) -> Self {
|
||||
match value {
|
||||
@@ -665,6 +681,15 @@ impl PageServerHandler {
|
||||
span,
|
||||
)
|
||||
}
|
||||
PagestreamFeMessage::GetVectoredPages(req) => {
|
||||
let span = tracing::info_span!("handle_get_vectored_pages_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn, req_count = %req.count);
|
||||
(
|
||||
self.handle_get_pages_at_lsn_request(tenant_id, timeline_id, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await,
|
||||
span,
|
||||
)
|
||||
}
|
||||
};
|
||||
|
||||
match response {
|
||||
@@ -1160,6 +1185,80 @@ impl PageServerHandler {
|
||||
}))
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(shard_id))]
|
||||
async fn handle_get_pages_at_lsn_request(
|
||||
&mut self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
req: &PagestreamGetVectoredPagesRequest,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<PagestreamBeMessage, PageStreamError> {
|
||||
// This is cheeky and relies on not using sharding :)
|
||||
// A real solution has to split the requested key sequence between shards.
|
||||
let get_page_request = PagestreamGetPageRequest {
|
||||
latest: req.latest,
|
||||
lsn: req.lsn,
|
||||
rel: req.rel,
|
||||
blkno: req.blkno,
|
||||
};
|
||||
|
||||
let timeline = match self.get_cached_timeline_for_page(&get_page_request) {
|
||||
Ok(tl) => tl,
|
||||
Err(key) => {
|
||||
match self
|
||||
.load_timeline_for_page(tenant_id, timeline_id, key)
|
||||
.await
|
||||
{
|
||||
Ok(t) => t,
|
||||
Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => {
|
||||
// We already know this tenant exists in general, because we resolved it at
|
||||
// start of connection. Getting a NotFound here indicates that the shard containing
|
||||
// the requested page is not present on this node: the client's knowledge of shard->pageserver
|
||||
// mapping is out of date.
|
||||
//
|
||||
// Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
|
||||
// client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
|
||||
// and talk to a different pageserver.
|
||||
return Err(PageStreamError::Reconnect(
|
||||
"getpage@lsn request routed to wrong shard".into(),
|
||||
));
|
||||
}
|
||||
Err(e) => return Err(e.into()),
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// load_timeline_for_page sets shard_id, but get_cached_timeline_for_page doesn't
|
||||
set_tracing_field_shard_id(timeline);
|
||||
|
||||
let _timer = timeline
|
||||
.query_metrics
|
||||
.start_timer(metrics::SmgrQueryType::GetPageAtLsn);
|
||||
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn =
|
||||
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
|
||||
.await?;
|
||||
|
||||
let (page_count, pages_buf) = timeline
|
||||
.get_rel_pages_at_lsn(
|
||||
req.rel,
|
||||
req.blkno,
|
||||
req.count,
|
||||
Version::Lsn(lsn),
|
||||
req.latest,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(PagestreamBeMessage::GetVectoredPages(
|
||||
PagestreamGetVectoredPagesResponse {
|
||||
page_count,
|
||||
pages: pages_buf,
|
||||
},
|
||||
))
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(shard_id))]
|
||||
async fn handle_get_slru_segment_request(
|
||||
&mut self,
|
||||
|
||||
@@ -11,8 +11,9 @@ use crate::context::RequestContext;
|
||||
use crate::keyspace::{KeySpace, KeySpaceAccum};
|
||||
use crate::repository::*;
|
||||
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
|
||||
use crate::tenant::timeline::GetVectoredError;
|
||||
use crate::walrecord::NeonWalRecord;
|
||||
use anyhow::{ensure, Context};
|
||||
use anyhow::{anyhow, ensure, Context};
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
use enum_map::Enum;
|
||||
use pageserver_api::key::{
|
||||
@@ -26,7 +27,7 @@ use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
|
||||
use postgres_ffi::BLCKSZ;
|
||||
use postgres_ffi::{Oid, TimestampTz, TransactionId};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::{hash_map, HashMap, HashSet};
|
||||
use std::collections::{hash_map, BTreeMap, HashMap, HashSet};
|
||||
use std::ops::ControlFlow;
|
||||
use std::ops::Range;
|
||||
use strum::IntoEnumIterator;
|
||||
@@ -198,6 +199,41 @@ impl Timeline {
|
||||
version.get(self, key, ctx).await
|
||||
}
|
||||
|
||||
pub(crate) async fn get_rel_pages_at_lsn(
|
||||
&self,
|
||||
tag: RelTag,
|
||||
blknum: BlockNumber,
|
||||
count: u8,
|
||||
version: Version<'_>,
|
||||
latest: bool,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(u8, Bytes), GetVectoredError> {
|
||||
if tag.relnode == 0 {
|
||||
return Err(GetVectoredError::Other(
|
||||
RelationError::InvalidRelnode.into(),
|
||||
));
|
||||
}
|
||||
|
||||
let nblocks = self
|
||||
.get_rel_size(tag, version, latest, ctx)
|
||||
.await
|
||||
.map_err(|e| GetVectoredError::Other(anyhow!(e)))?;
|
||||
if blknum + (count - 1) as u32 >= nblocks {
|
||||
debug!(
|
||||
"read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
|
||||
tag,
|
||||
blknum,
|
||||
version.get_lsn(),
|
||||
nblocks
|
||||
);
|
||||
return Ok((1, ZERO_PAGE.clone()));
|
||||
}
|
||||
|
||||
let start_key = rel_block_to_key(tag, blknum);
|
||||
let end_key = start_key.add(count as u32);
|
||||
version.get_vectored(self, start_key..end_key, ctx).await
|
||||
}
|
||||
|
||||
// Get size of a database in blocks
|
||||
pub(crate) async fn get_db_size(
|
||||
&self,
|
||||
@@ -1604,6 +1640,55 @@ impl<'a> DatadirModification<'a> {
|
||||
self.tline.get(key, lsn, ctx).await
|
||||
}
|
||||
|
||||
async fn get_vectored(
|
||||
&self,
|
||||
key_range: Range<Key>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
|
||||
// Have we already updated the same key? Read the latest pending updated
|
||||
// version in that case.
|
||||
//
|
||||
// Note: we don't check pending_deletions. It is an error to request a
|
||||
// value that has been removed, deletion only avoids leaking storage.
|
||||
let mut results: BTreeMap<Key, Result<Bytes, PageReconstructError>> = BTreeMap::new();
|
||||
let mut keys_in_modification = KeySpaceAccum::new();
|
||||
|
||||
let key = key_range.start;
|
||||
while key != key_range.end {
|
||||
if let Some(values) = self.pending_updates.get(&key) {
|
||||
if let Some((_, value)) = values.last() {
|
||||
keys_in_modification.add_key(key);
|
||||
|
||||
match value {
|
||||
Value::Image(img) => {
|
||||
results.insert(key, Ok(img.clone()));
|
||||
}
|
||||
_ => {
|
||||
results.insert(
|
||||
key,
|
||||
Err(PageReconstructError::from(anyhow::anyhow!(
|
||||
"unexpected pending WAL record"
|
||||
))),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
|
||||
|
||||
let mut keyspace = KeySpace {
|
||||
ranges: vec![key_range],
|
||||
};
|
||||
keyspace.remove_overlapping_with(&keys_in_modification.to_keyspace());
|
||||
|
||||
let pages = self.tline.get_vectored(keyspace, lsn, ctx).await?;
|
||||
results.extend(pages.into_iter());
|
||||
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
fn put(&mut self, key: Key, val: Value) {
|
||||
let values = self.pending_updates.entry(key).or_default();
|
||||
// Replace the previous value if it exists at the same lsn
|
||||
@@ -1647,6 +1732,43 @@ impl<'a> Version<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_vectored(
|
||||
&self,
|
||||
timeline: &Timeline,
|
||||
key_range: Range<Key>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(u8, Bytes), GetVectoredError> {
|
||||
let pages = match self {
|
||||
Version::Lsn(lsn) => {
|
||||
timeline
|
||||
.get_vectored(
|
||||
KeySpace {
|
||||
ranges: vec![key_range],
|
||||
},
|
||||
*lsn,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
}
|
||||
Version::Modified(modification) => modification.get_vectored(key_range, ctx).await,
|
||||
}?;
|
||||
|
||||
let mut buf = BytesMut::new();
|
||||
let page_count: u8 = pages.len().try_into().expect("too many pages returned");
|
||||
for page in pages {
|
||||
match page {
|
||||
(_key, Ok(bytes)) => {
|
||||
buf.extend_from_slice(&bytes[..]);
|
||||
}
|
||||
(_key, Err(err)) => {
|
||||
return Err(GetVectoredError::Other(anyhow!(err)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok((page_count, buf.freeze()))
|
||||
}
|
||||
|
||||
fn get_lsn(&self) -> Lsn {
|
||||
match self {
|
||||
Version::Lsn(lsn) => *lsn,
|
||||
|
||||
@@ -74,6 +74,7 @@ fn analyze_trace<R: std::io::Read>(mut reader: R) {
|
||||
}
|
||||
prev = Some(req);
|
||||
}
|
||||
PagestreamFeMessage::GetVectoredPages(_) => {}
|
||||
PagestreamFeMessage::DbSize(_) => {}
|
||||
};
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user