Use tags starting from 10 for command of new protocol

This commit is contained in:
Konstantin Knizhnik
2024-02-16 09:32:27 +02:00
parent 93e6046005
commit ccbf95e9dc
4 changed files with 41 additions and 99 deletions

View File

@@ -798,10 +798,9 @@ pub mod virtual_file {
pub enum PagestreamFeMessage {
Exists(PagestreamExistsRequest),
Nblocks(PagestreamNblocksRequest),
GetLatestPage(PagestreamGetLatestPageRequest), // for compatinility with old clients
GetPage(PagestreamGetPageRequest),
DbSize(PagestreamDbSizeRequest),
GetSlruSegment(PagestreamGetSlruSegmentRequest),
GetPage(PagestreamGetPageRequest),
}
// Wrapped in libpq CopyData
@@ -854,14 +853,6 @@ pub struct PagestreamNblocksRequest {
pub rel: RelTag,
}
#[derive(Debug, PartialEq, Eq)]
pub struct PagestreamGetLatestPageRequest {
pub latest: bool,
pub lsn: Lsn,
pub rel: RelTag,
pub blkno: u32,
}
#[derive(Debug, PartialEq, Eq)]
pub struct PagestreamGetPageRequest {
pub horizon: Lsn,
@@ -951,9 +942,9 @@ impl PagestreamFeMessage {
bytes.put_u8(req.rel.forknum);
}
Self::GetLatestPage(req) => {
Self::GetPage(req) => {
bytes.put_u8(2);
bytes.put_u8(u8::from(req.latest));
bytes.put_u64(req.horizon.0);
bytes.put_u64(req.lsn.0);
bytes.put_u32(req.rel.spcnode);
bytes.put_u32(req.rel.dbnode);
@@ -976,17 +967,6 @@ impl PagestreamFeMessage {
bytes.put_u8(req.kind);
bytes.put_u32(req.segno);
}
Self::GetPage(req) => {
bytes.put_u8(5);
bytes.put_u64(req.horizon.0);
bytes.put_u64(req.lsn.0);
bytes.put_u32(req.rel.spcnode);
bytes.put_u32(req.rel.dbnode);
bytes.put_u32(req.rel.relnode);
bytes.put_u8(req.rel.forknum);
bytes.put_u32(req.blkno);
}
}
bytes.into()
@@ -999,11 +979,25 @@ impl PagestreamFeMessage {
//
// TODO: consider using protobuf or serde bincode for less error prone
// serialization.
let msg_tag = body.read_u8()?;
let mut msg_tag = body.read_u8()?;
let horizon = if msg_tag >= 10 {
// new protocol
msg_tag -= 10;
Lsn::from(body.read_u64::<BigEndian>()?)
} else {
// old_protocol
let latest = body.read_u8()? != 0;
if latest {
Lsn::MAX
} else {
Lsn::INVALID
}
};
let lsn = Lsn::from(body.read_u64::<BigEndian>()?);
match msg_tag {
0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
horizon: Lsn::from(body.read_u64::<BigEndian>()?),
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
horizon,
lsn,
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
@@ -1012,8 +1006,8 @@ impl PagestreamFeMessage {
},
})),
1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
horizon: Lsn::from(body.read_u64::<BigEndian>()?),
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
horizon,
lsn,
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
@@ -1021,35 +1015,9 @@ impl PagestreamFeMessage {
forknum: body.read_u8()?,
},
})),
2 => Ok(PagestreamFeMessage::GetLatestPage(
PagestreamGetLatestPageRequest {
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
},
blkno: body.read_u32::<BigEndian>()?,
},
)),
3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
horizon: Lsn::from(body.read_u64::<BigEndian>()?),
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
dbnode: body.read_u32::<BigEndian>()?,
})),
4 => Ok(PagestreamFeMessage::GetSlruSegment(
PagestreamGetSlruSegmentRequest {
horizon: Lsn::from(body.read_u64::<BigEndian>()?),
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
kind: body.read_u8()?,
segno: body.read_u32::<BigEndian>()?,
},
)),
5 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
horizon: Lsn::from(body.read_u64::<BigEndian>()?),
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
horizon,
lsn,
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
@@ -1058,6 +1026,19 @@ impl PagestreamFeMessage {
},
blkno: body.read_u32::<BigEndian>()?,
})),
3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
horizon,
lsn,
dbnode: body.read_u32::<BigEndian>()?,
})),
4 => Ok(PagestreamFeMessage::GetSlruSegment(
PagestreamGetSlruSegmentRequest {
horizon,
lsn,
kind: body.read_u8()?,
segno: body.read_u32::<BigEndian>()?,
},
)),
_ => bail!("unknown smgr message tag: {:?}", msg_tag),
}
}

View File

@@ -637,25 +637,6 @@ impl PageServerHandler {
span,
)
}
PagestreamFeMessage::GetLatestPage(old_req) => {
let req = PagestreamGetPageRequest {
horizon: if old_req.latest {
Lsn::MAX
} else {
old_req.lsn
},
lsn: old_req.lsn,
rel: old_req.rel,
blkno: old_req.blkno,
};
let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn);
(
self.handle_get_page_at_lsn_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
.await,
span,
)
}
PagestreamFeMessage::GetPage(req) => {
// shard_id is filled in by the handler
let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn);

View File

@@ -31,12 +31,11 @@
typedef enum
{
/* pagestore_client -> pagestore */
T_NeonExistsRequest = 0,
T_NeonExistsRequest = 10, /* new protocol message tags start from 10 */
T_NeonNblocksRequest,
T_NeonGetLatestPageRequest, /* old format of get_page command */
T_NeonGetPageRequest,
T_NeonDbSizeRequest,
T_NeonGetSlruSegmentRequest,
T_NeonGetPageRequest,
/* pagestore -> pagestore_client */
T_NeonExistsResponse = 100,

View File

@@ -7,9 +7,7 @@ use std::{
io::BufReader,
};
use pageserver_api::models::{
PagestreamFeMessage, PagestreamGetLatestPageRequest, PagestreamGetPageRequest,
};
use pageserver_api::models::{PagestreamFeMessage, PagestreamGetPageRequest};
use utils::id::{ConnectionId, TenantId, TimelineId};
use clap::{Parser, Subcommand};
@@ -53,11 +51,9 @@ enum Command {
// - detect any prefetching anomalies by looking for negative deltas during seqscan
fn analyze_trace<R: std::io::Read>(mut reader: R) {
let mut total = 0; // Total requests traced
let mut old = 0; // Old requests traced
let mut cross_rel = 0; // Requests that ask for different rel than previous request
let mut deltas = HashMap::<i32, u32>::new(); // Consecutive blkno differences
let mut prev: Option<PagestreamGetPageRequest> = None;
let mut old_prev: Option<PagestreamGetLatestPageRequest> = None;
// Compute stats
while let Ok(msg) = PagestreamFeMessage::parse(&mut reader) {
@@ -65,20 +61,6 @@ fn analyze_trace<R: std::io::Read>(mut reader: R) {
PagestreamFeMessage::Exists(_) => {}
PagestreamFeMessage::Nblocks(_) => {}
PagestreamFeMessage::GetSlruSegment(_) => {}
PagestreamFeMessage::GetLatestPage(req) => {
total += 1;
old += 1;
if let Some(prev) = old_prev {
if prev.rel == req.rel {
let delta = (req.blkno as i32) - (prev.blkno as i32);
deltas.entry(delta).and_modify(|c| *c += 1).or_insert(1);
} else {
cross_rel += 1;
}
}
old_prev = Some(req);
}
PagestreamFeMessage::GetPage(req) => {
total += 1;
@@ -101,7 +83,6 @@ fn analyze_trace<R: std::io::Read>(mut reader: R) {
deltas.retain(|_, count| *count > 300);
other -= deltas.len();
dbg!(total);
dbg!(old);
dbg!(cross_rel);
dbg!(other);
dbg!(deltas);