test: get control values req

This commit is contained in:
Vlad Lazar
2024-03-04 16:04:08 +00:00
parent 2d7432231f
commit cffe724b01
4 changed files with 63 additions and 0 deletions

View File

@@ -739,6 +739,7 @@ pub enum PagestreamFeMessage {
DbSize(PagestreamDbSizeRequest),
GetSlruSegment(PagestreamGetSlruSegmentRequest),
GetVectoredPages(PagestreamGetVectoredPagesRequest),
GetControlFileValues(PagestreamGetControlFileValuesRequest),
}
// Wrapped in libpq CopyData
@@ -751,6 +752,7 @@ pub enum PagestreamBeMessage {
DbSize(PagestreamDbSizeResponse),
GetSlruSegment(PagestreamGetSlruSegmentResponse),
GetVectoredPages(PagestreamGetVectoredPagesResponse),
GetControlFileValues(PagestreamGetControlFileValuesResponse),
}
// Keep in sync with `pagestore_client.h`
@@ -763,6 +765,7 @@ enum PagestreamBeMessageTag {
DbSize = 104,
GetSlruSegment = 105,
GetVectoredPages = 106,
GetControlFileValues = 107,
}
impl TryFrom<u8> for PagestreamBeMessageTag {
type Error = u8;
@@ -826,6 +829,11 @@ pub struct PagestreamGetVectoredPagesRequest {
pub count: u8,
}
#[derive(Debug, PartialEq, Eq)]
pub struct PagestreamGetControlFileValuesRequest {
pub lsn: Lsn,
}
#[derive(Debug)]
pub struct PagestreamExistsResponse {
pub exists: bool,
@@ -852,6 +860,9 @@ pub struct PagestreamGetVectoredPagesResponse {
pub pages: Bytes,
}
#[derive(Debug)]
pub struct PagestreamGetControlFileValuesResponse {}
#[derive(Debug)]
pub struct PagestreamErrorResponse {
pub message: String,
@@ -935,6 +946,10 @@ impl PagestreamFeMessage {
bytes.put_u32(req.blkno);
bytes.put_u8(req.count);
}
Self::GetControlFileValues(req) => {
bytes.put_u8(6);
bytes.put_u64(req.lsn.0);
}
}
bytes.into()
@@ -1007,6 +1022,11 @@ impl PagestreamFeMessage {
count: body.read_u8()?,
},
)),
6 => Ok(PagestreamFeMessage::GetControlFileValues(
PagestreamGetControlFileValuesRequest {
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
},
)),
_ => bail!("unknown smgr message tag: {:?}", msg_tag),
}
}
@@ -1054,6 +1074,10 @@ impl PagestreamBeMessage {
bytes.put_u8(resp.page_count);
bytes.put(&resp.pages[..]);
}
Self::GetControlFileValues(_resp) => {
bytes.put_u8(Tag::GetControlFileValues as u8);
}
}
bytes.into()
@@ -1111,6 +1135,9 @@ impl PagestreamBeMessage {
pages: pages.into(),
})
}
Tag::GetControlFileValues => {
Self::GetControlFileValues(PagestreamGetControlFileValuesResponse {})
}
};
let remaining = buf.into_inner();
if !remaining.is_empty() {
@@ -1131,6 +1158,7 @@ impl PagestreamBeMessage {
Self::DbSize(_) => "DbSize",
Self::GetSlruSegment(_) => "GetSlruSegment",
Self::GetVectoredPages(_) => "GetVectoredPages",
Self::GetControlFileValues(_) => "GetControlFileValues",
}
}
}

View File

@@ -159,6 +159,7 @@ impl PagestreamClient {
| PagestreamBeMessage::Nblocks(_)
| PagestreamBeMessage::DbSize(_)
| PagestreamBeMessage::GetSlruSegment(_)
| PagestreamBeMessage::GetControlFileValues(_)
| PagestreamBeMessage::GetVectoredPages(_) => {
anyhow::bail!(
"unexpected be message kind in response to getpage request: {}",
@@ -190,6 +191,7 @@ impl PagestreamClient {
| PagestreamBeMessage::Nblocks(_)
| PagestreamBeMessage::DbSize(_)
| PagestreamBeMessage::GetSlruSegment(_)
| PagestreamBeMessage::GetControlFileValues(_)
| PagestreamBeMessage::GetPage(_) => {
anyhow::bail!(
"unexpected be message kind in response to getpage request: {}",

View File

@@ -17,6 +17,11 @@ use futures::stream::FuturesUnordered;
use futures::Stream;
use futures::StreamExt;
use pageserver_api::key::Key;
use pageserver_api::key::AUX_FILES_KEY;
use pageserver_api::key::CONTROLFILE_KEY;
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::PagestreamGetControlFileValuesRequest;
use pageserver_api::models::PagestreamGetControlFileValuesResponse;
use pageserver_api::models::PagestreamGetVectoredPagesRequest;
use pageserver_api::models::PagestreamGetVectoredPagesResponse;
use pageserver_api::models::TenantState;
@@ -690,6 +695,15 @@ impl PageServerHandler {
span,
)
}
PagestreamFeMessage::GetControlFileValues(req) => {
let span = tracing::info_span!("handle_get_control_values", req_lsn = %req.lsn);
(
self.handle_get_control_values(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
.await,
span,
)
}
};
match response {
@@ -1184,6 +1198,24 @@ impl PageServerHandler {
page,
}))
}
#[instrument(skip_all, fields(shard_id))]
async fn handle_get_control_values(
&mut self,
tenant_id: TenantId,
timeline_id: TimelineId,
req: &PagestreamGetControlFileValuesRequest,
ctx: &RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
let timeline = self.get_timeline_shard_zero(tenant_id, timeline_id).await?;
let keyspace = KeySpace {
ranges: vec![CONTROLFILE_KEY..AUX_FILES_KEY.next()],
};
let _ = timeline.get_vectored(keyspace, req.lsn, ctx).await;
Ok(PagestreamBeMessage::GetControlFileValues(
PagestreamGetControlFileValuesResponse {},
))
}
#[instrument(skip_all, fields(shard_id))]
async fn handle_get_pages_at_lsn_request(

View File

@@ -75,6 +75,7 @@ fn analyze_trace<R: std::io::Read>(mut reader: R) {
prev = Some(req);
}
PagestreamFeMessage::GetVectoredPages(_) => {}
PagestreamFeMessage::GetControlFileValues(_) => {}
PagestreamFeMessage::DbSize(_) => {}
};
}