Send LSN range in getpage request

This commit is contained in:
Konstantin Knizhnik
2024-02-10 21:54:41 +02:00
parent d47e4a2a41
commit 93e6046005
9 changed files with 271 additions and 180 deletions

View File

@@ -798,9 +798,10 @@ pub mod virtual_file {
pub enum PagestreamFeMessage {
Exists(PagestreamExistsRequest),
Nblocks(PagestreamNblocksRequest),
GetPage(PagestreamGetPageRequest),
GetLatestPage(PagestreamGetLatestPageRequest), // for compatinility with old clients
DbSize(PagestreamDbSizeRequest),
GetSlruSegment(PagestreamGetSlruSegmentRequest),
GetPage(PagestreamGetPageRequest),
}
// Wrapped in libpq CopyData
@@ -841,36 +842,44 @@ impl TryFrom<u8> for PagestreamBeMessageTag {
#[derive(Debug, PartialEq, Eq)]
pub struct PagestreamExistsRequest {
pub latest: bool,
pub horizon: Lsn,
pub lsn: Lsn,
pub rel: RelTag,
}
#[derive(Debug, PartialEq, Eq)]
pub struct PagestreamNblocksRequest {
pub latest: bool,
pub horizon: Lsn,
pub lsn: Lsn,
pub rel: RelTag,
}
#[derive(Debug, PartialEq, Eq)]
pub struct PagestreamGetPageRequest {
pub struct PagestreamGetLatestPageRequest {
pub latest: bool,
pub lsn: Lsn,
pub rel: RelTag,
pub blkno: u32,
}
#[derive(Debug, PartialEq, Eq)]
pub struct PagestreamGetPageRequest {
pub horizon: Lsn,
pub lsn: Lsn,
pub rel: RelTag,
pub blkno: u32,
}
#[derive(Debug, PartialEq, Eq)]
pub struct PagestreamDbSizeRequest {
pub latest: bool,
pub horizon: Lsn,
pub lsn: Lsn,
pub dbnode: u32,
}
#[derive(Debug, PartialEq, Eq)]
pub struct PagestreamGetSlruSegmentRequest {
pub latest: bool,
pub horizon: Lsn,
pub lsn: Lsn,
pub kind: u8,
pub segno: u32,
@@ -924,7 +933,7 @@ impl PagestreamFeMessage {
match self {
Self::Exists(req) => {
bytes.put_u8(0);
bytes.put_u8(u8::from(req.latest));
bytes.put_u64(req.horizon.0);
bytes.put_u64(req.lsn.0);
bytes.put_u32(req.rel.spcnode);
bytes.put_u32(req.rel.dbnode);
@@ -934,7 +943,7 @@ impl PagestreamFeMessage {
Self::Nblocks(req) => {
bytes.put_u8(1);
bytes.put_u8(u8::from(req.latest));
bytes.put_u64(req.horizon.0);
bytes.put_u64(req.lsn.0);
bytes.put_u32(req.rel.spcnode);
bytes.put_u32(req.rel.dbnode);
@@ -942,7 +951,7 @@ impl PagestreamFeMessage {
bytes.put_u8(req.rel.forknum);
}
Self::GetPage(req) => {
Self::GetLatestPage(req) => {
bytes.put_u8(2);
bytes.put_u8(u8::from(req.latest));
bytes.put_u64(req.lsn.0);
@@ -955,18 +964,29 @@ impl PagestreamFeMessage {
Self::DbSize(req) => {
bytes.put_u8(3);
bytes.put_u8(u8::from(req.latest));
bytes.put_u64(req.horizon.0);
bytes.put_u64(req.lsn.0);
bytes.put_u32(req.dbnode);
}
Self::GetSlruSegment(req) => {
bytes.put_u8(4);
bytes.put_u8(u8::from(req.latest));
bytes.put_u64(req.horizon.0);
bytes.put_u64(req.lsn.0);
bytes.put_u8(req.kind);
bytes.put_u32(req.segno);
}
Self::GetPage(req) => {
bytes.put_u8(5);
bytes.put_u64(req.horizon.0);
bytes.put_u64(req.lsn.0);
bytes.put_u32(req.rel.spcnode);
bytes.put_u32(req.rel.dbnode);
bytes.put_u32(req.rel.relnode);
bytes.put_u8(req.rel.forknum);
bytes.put_u32(req.blkno);
}
}
bytes.into()
@@ -982,7 +1002,7 @@ impl PagestreamFeMessage {
let msg_tag = body.read_u8()?;
match msg_tag {
0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
latest: body.read_u8()? != 0,
horizon: Lsn::from(body.read_u64::<BigEndian>()?),
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
@@ -992,7 +1012,7 @@ impl PagestreamFeMessage {
},
})),
1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
latest: body.read_u8()? != 0,
horizon: Lsn::from(body.read_u64::<BigEndian>()?),
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
@@ -1001,8 +1021,34 @@ impl PagestreamFeMessage {
forknum: body.read_u8()?,
},
})),
2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
latest: body.read_u8()? != 0,
2 => Ok(PagestreamFeMessage::GetLatestPage(
PagestreamGetLatestPageRequest {
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
},
blkno: body.read_u32::<BigEndian>()?,
},
)),
3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
horizon: Lsn::from(body.read_u64::<BigEndian>()?),
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
dbnode: body.read_u32::<BigEndian>()?,
})),
4 => Ok(PagestreamFeMessage::GetSlruSegment(
PagestreamGetSlruSegmentRequest {
horizon: Lsn::from(body.read_u64::<BigEndian>()?),
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
kind: body.read_u8()?,
segno: body.read_u32::<BigEndian>()?,
},
)),
5 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
horizon: Lsn::from(body.read_u64::<BigEndian>()?),
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
@@ -1012,19 +1058,6 @@ impl PagestreamFeMessage {
},
blkno: body.read_u32::<BigEndian>()?,
})),
3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
dbnode: body.read_u32::<BigEndian>()?,
})),
4 => Ok(PagestreamFeMessage::GetSlruSegment(
PagestreamGetSlruSegmentRequest {
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
kind: body.read_u8()?,
segno: body.read_u32::<BigEndian>()?,
},
)),
_ => bail!("unknown smgr message tag: {:?}", msg_tag),
}
}
@@ -1148,7 +1181,7 @@ mod tests {
// Test serialization/deserialization of PagestreamFeMessage
let messages = vec![
PagestreamFeMessage::Exists(PagestreamExistsRequest {
latest: true,
horizon: Lsn::MAX,
lsn: Lsn(4),
rel: RelTag {
forknum: 1,
@@ -1158,7 +1191,7 @@ mod tests {
},
}),
PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
latest: false,
horizon: Lsn::INVALID,
lsn: Lsn(4),
rel: RelTag {
forknum: 1,
@@ -1168,8 +1201,8 @@ mod tests {
},
}),
PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
latest: true,
lsn: Lsn(4),
horizon: Lsn::MAX,
lsn: Lsn::INVALID,
rel: RelTag {
forknum: 1,
spcnode: 2,
@@ -1179,7 +1212,7 @@ mod tests {
blkno: 7,
}),
PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
latest: true,
horizon: Lsn::MAX,
lsn: Lsn(4),
dbnode: 7,
}),

View File

@@ -312,7 +312,11 @@ async fn main_impl(
let (rel_tag, block_no) =
key_to_rel_block(key).expect("we filter non-rel-block keys out above");
PagestreamGetPageRequest {
latest: rng.gen_bool(args.req_latest_probability),
horizon: if rng.gen_bool(args.req_latest_probability) {
Lsn::MAX
} else {
r.timeline_lsn
},
lsn: r.timeline_lsn,
rel: rel_tag,
blkno: block_no,

View File

@@ -361,9 +361,10 @@ where
/// Add contents of relfilenode `src`, naming it as `dst`.
async fn add_rel(&mut self, src: RelTag, dst: RelTag) -> anyhow::Result<()> {
let horizon = self.lsn; // we do not need latest version
let nblocks = self
.timeline
.get_rel_size(src, Version::Lsn(self.lsn), false, self.ctx)
.get_rel_size(src, Version::Lsn(self.lsn), horizon, self.ctx)
.await?;
// If the relation is empty, create an empty file
@@ -384,7 +385,7 @@ where
for blknum in startblk..endblk {
let img = self
.timeline
.get_rel_page_at_lsn(src, blknum, Version::Lsn(self.lsn), false, self.ctx)
.get_rel_page_at_lsn(src, blknum, Version::Lsn(self.lsn), horizon, self.ctx)
.await?;
segment_data.extend_from_slice(&img[..]);
}

View File

@@ -637,6 +637,25 @@ impl PageServerHandler {
span,
)
}
PagestreamFeMessage::GetLatestPage(old_req) => {
let req = PagestreamGetPageRequest {
horizon: if old_req.latest {
Lsn::MAX
} else {
old_req.lsn
},
lsn: old_req.lsn,
rel: old_req.rel,
blkno: old_req.blkno,
};
let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn);
(
self.handle_get_page_at_lsn_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
.await,
span,
)
}
PagestreamFeMessage::GetPage(req) => {
// shard_id is filled in by the handler
let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn);
@@ -849,67 +868,47 @@ impl PageServerHandler {
/// the LSN that should be used to look up the page versions.
async fn wait_or_get_last_lsn(
timeline: &Timeline,
mut lsn: Lsn,
latest: bool,
lsn: Lsn,
horizon: Lsn,
latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
ctx: &RequestContext,
) -> Result<Lsn, PageStreamError> {
if latest {
// Latest page version was requested. If LSN is given, it is a hint
// to the page server that there have been no modifications to the
// page after that LSN. If we haven't received WAL up to that point,
// wait until it arrives.
let last_record_lsn = timeline.get_last_record_lsn();
// Note: this covers the special case that lsn == Lsn(0). That
// special case means "return the latest version whatever it is",
// and it's used for bootstrapping purposes, when the page server is
// connected directly to the compute node. That is needed because
// when you connect to the compute node, to receive the WAL, the
// walsender process will do a look up in the pg_authid catalog
// table for authentication. That poses a deadlock problem: the
// catalog table lookup will send a GetPage request, but the GetPage
// request will block in the page server because the recent WAL
// hasn't been received yet, and it cannot be received until the
// walsender completes the authentication and starts streaming the
// WAL.
if lsn <= last_record_lsn {
lsn = last_record_lsn;
} else {
timeline
.wait_lsn(
lsn,
crate::tenant::timeline::WaitLsnWaiter::PageService,
ctx,
)
.await?;
// Since we waited for 'lsn' to arrive, that is now the last
// record LSN. (Or close enough for our purposes; the
// last-record LSN can advance immediately after we return
// anyway)
}
let last_record_lsn = timeline.get_last_record_lsn();
// Horizon = 0 (INVALID) is treated as LSN interval degenerated to point [lsn,lsn].
// It as done mostly for convenience (because such get_page commands are widely used in tests) and
// also seems to be logical: Lsn::MAX moves upper boundary of LSN interval till last_record_lsn and
// Lsn(0) moves upper boundary to lower boundary.
let request_horizon = if horizon == Lsn::INVALID {
lsn
} else {
if lsn == Lsn(0) {
return Err(PageStreamError::BadRequest(
"invalid LSN(0) in request".into(),
));
}
horizon
};
let effective_lsn = Lsn::max(lsn, Lsn::min(request_horizon, last_record_lsn));
if effective_lsn > last_record_lsn {
timeline
.wait_lsn(
lsn,
effective_lsn,
crate::tenant::timeline::WaitLsnWaiter::PageService,
ctx,
)
.await?;
// Since we waited for 'lsn' to arrive, that is now the last
// record LSN. (Or close enough for our purposes; the
// last-record LSN can advance immediately after we return
// anyway)
} else if effective_lsn == Lsn(0) {
return Err(PageStreamError::BadRequest(
"invalid LSN(0) in request".into(),
));
}
if lsn < **latest_gc_cutoff_lsn {
if effective_lsn < **latest_gc_cutoff_lsn {
return Err(PageStreamError::BadRequest(format!(
"tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
lsn, **latest_gc_cutoff_lsn
effective_lsn, **latest_gc_cutoff_lsn
).into()));
}
Ok(lsn)
Ok(effective_lsn)
}
#[instrument(skip_all, fields(shard_id))]
@@ -927,11 +926,11 @@ impl PageServerHandler {
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn =
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
Self::wait_or_get_last_lsn(timeline, req.lsn, req.horizon, &latest_gc_cutoff_lsn, ctx)
.await?;
let exists = timeline
.get_rel_exists(req.rel, Version::Lsn(lsn), req.latest, ctx)
.get_rel_exists(req.rel, Version::Lsn(lsn), req.horizon, ctx)
.await?;
Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse {
@@ -955,11 +954,11 @@ impl PageServerHandler {
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn =
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
Self::wait_or_get_last_lsn(timeline, req.lsn, req.horizon, &latest_gc_cutoff_lsn, ctx)
.await?;
let n_blocks = timeline
.get_rel_size(req.rel, Version::Lsn(lsn), req.latest, ctx)
.get_rel_size(req.rel, Version::Lsn(lsn), req.horizon, ctx)
.await?;
Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse {
@@ -983,7 +982,7 @@ impl PageServerHandler {
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn =
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
Self::wait_or_get_last_lsn(timeline, req.lsn, req.horizon, &latest_gc_cutoff_lsn, ctx)
.await?;
let total_blocks = timeline
@@ -991,7 +990,7 @@ impl PageServerHandler {
DEFAULTTABLESPACE_OID,
req.dbnode,
Version::Lsn(lsn),
req.latest,
req.horizon,
ctx,
)
.await?;
@@ -1161,11 +1160,11 @@ impl PageServerHandler {
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn =
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
Self::wait_or_get_last_lsn(timeline, req.lsn, req.horizon, &latest_gc_cutoff_lsn, ctx)
.await?;
let page = timeline
.get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), req.latest, ctx)
.get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), req.horizon, ctx)
.await?;
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
@@ -1189,7 +1188,7 @@ impl PageServerHandler {
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn =
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
Self::wait_or_get_last_lsn(timeline, req.lsn, req.horizon, &latest_gc_cutoff_lsn, ctx)
.await?;
let kind = SlruKind::from_repr(req.kind)

View File

@@ -175,7 +175,7 @@ impl Timeline {
tag: RelTag,
blknum: BlockNumber,
version: Version<'_>,
latest: bool,
horizon: Lsn,
ctx: &RequestContext,
) -> Result<Bytes, PageReconstructError> {
if tag.relnode == 0 {
@@ -184,7 +184,7 @@ impl Timeline {
));
}
let nblocks = self.get_rel_size(tag, version, latest, ctx).await?;
let nblocks = self.get_rel_size(tag, version, horizon, ctx).await?;
if blknum >= nblocks {
debug!(
"read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
@@ -206,7 +206,7 @@ impl Timeline {
spcnode: Oid,
dbnode: Oid,
version: Version<'_>,
latest: bool,
horizon: Lsn,
ctx: &RequestContext,
) -> Result<usize, PageReconstructError> {
let mut total_blocks = 0;
@@ -214,7 +214,7 @@ impl Timeline {
let rels = self.list_rels(spcnode, dbnode, version, ctx).await?;
for rel in rels {
let n_blocks = self.get_rel_size(rel, version, latest, ctx).await?;
let n_blocks = self.get_rel_size(rel, version, horizon, ctx).await?;
total_blocks += n_blocks as usize;
}
Ok(total_blocks)
@@ -225,7 +225,7 @@ impl Timeline {
&self,
tag: RelTag,
version: Version<'_>,
latest: bool,
horizon: Lsn,
ctx: &RequestContext,
) -> Result<BlockNumber, PageReconstructError> {
if tag.relnode == 0 {
@@ -239,7 +239,7 @@ impl Timeline {
}
if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
&& !self.get_rel_exists(tag, version, latest, ctx).await?
&& !self.get_rel_exists(tag, version, horizon, ctx).await?
{
// FIXME: Postgres sometimes calls smgrcreate() to create
// FSM, and smgrnblocks() on it immediately afterwards,
@@ -252,14 +252,8 @@ impl Timeline {
let mut buf = version.get(self, key, ctx).await?;
let nblocks = buf.get_u32_le();
if latest {
// Update relation size cache only if "latest" flag is set.
// This flag is set by compute when it is working with most recent version of relation.
// Typically master compute node always set latest=true.
// Please notice, that even if compute node "by mistake" specifies old LSN but set
// latest=true, then it can not cause cache corruption, because with latest=true
// pageserver choose max(request_lsn, last_written_lsn) and so cached value will be
// associated with most recent value of LSN.
if horizon == Lsn::MAX {
// Update relation size cache only if latest version is requested.
self.update_cached_rel_size(tag, version.get_lsn(), nblocks);
}
Ok(nblocks)
@@ -270,7 +264,7 @@ impl Timeline {
&self,
tag: RelTag,
version: Version<'_>,
_latest: bool,
_horizon: Lsn,
ctx: &RequestContext,
) -> Result<bool, PageReconstructError> {
if tag.relnode == 0 {
@@ -1088,7 +1082,7 @@ impl<'a> DatadirModification<'a> {
) -> anyhow::Result<()> {
let total_blocks = self
.tline
.get_db_size(spcnode, dbnode, Version::Modified(self), true, ctx)
.get_db_size(spcnode, dbnode, Version::Modified(self), Lsn::MAX, ctx)
.await?;
// Remove entry from dbdir
@@ -1187,7 +1181,7 @@ impl<'a> DatadirModification<'a> {
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
if self
.tline
.get_rel_exists(rel, Version::Modified(self), true, ctx)
.get_rel_exists(rel, Version::Modified(self), Lsn::MAX, ctx)
.await?
{
let size_key = rel_size_to_key(rel);

View File

@@ -1034,7 +1034,7 @@ impl WalIngest {
let nblocks = modification
.tline
.get_rel_size(src_rel, Version::Modified(modification), true, ctx)
.get_rel_size(src_rel, Version::Modified(modification), Lsn::MAX, ctx)
.await?;
let dst_rel = RelTag {
spcnode: tablespace_id,
@@ -1072,7 +1072,7 @@ impl WalIngest {
src_rel,
blknum,
Version::Modified(modification),
true,
Lsn::MAX,
ctx,
)
.await?;
@@ -1242,7 +1242,7 @@ impl WalIngest {
};
if modification
.tline
.get_rel_exists(rel, Version::Modified(modification), true, ctx)
.get_rel_exists(rel, Version::Modified(modification), Lsn::MAX, ctx)
.await?
{
self.put_rel_drop(modification, rel, ctx).await?;
@@ -1541,7 +1541,7 @@ impl WalIngest {
nblocks
} else if !modification
.tline
.get_rel_exists(rel, Version::Modified(modification), true, ctx)
.get_rel_exists(rel, Version::Modified(modification), Lsn::MAX, ctx)
.await?
{
// create it with 0 size initially, the logic below will extend it
@@ -1553,7 +1553,7 @@ impl WalIngest {
} else {
modification
.tline
.get_rel_size(rel, Version::Modified(modification), true, ctx)
.get_rel_size(rel, Version::Modified(modification), Lsn::MAX, ctx)
.await?
};
@@ -1650,14 +1650,14 @@ async fn get_relsize(
) -> anyhow::Result<BlockNumber> {
let nblocks = if !modification
.tline
.get_rel_exists(rel, Version::Modified(modification), true, ctx)
.get_rel_exists(rel, Version::Modified(modification), Lsn::MAX, ctx)
.await?
{
0
} else {
modification
.tline
.get_rel_size(rel, Version::Modified(modification), true, ctx)
.get_rel_size(rel, Version::Modified(modification), Lsn::MAX, ctx)
.await?
};
Ok(nblocks)
@@ -1732,29 +1732,29 @@ mod tests {
// The relation was created at LSN 2, not visible at LSN 1 yet.
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx)
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), Lsn::INVALID, &ctx)
.await?,
false
);
assert!(tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), Lsn::INVALID, &ctx)
.await
.is_err());
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), Lsn::INVALID, &ctx)
.await?,
true
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), Lsn::INVALID, &ctx)
.await?,
1
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), Lsn::INVALID, &ctx)
.await?,
3
);
@@ -1762,46 +1762,46 @@ mod tests {
// Check page contents at each LSN
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x20)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x20)), Lsn::INVALID, &ctx)
.await?,
test_img("foo blk 0 at 2")
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x30)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x30)), Lsn::INVALID, &ctx)
.await?,
test_img("foo blk 0 at 3")
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x40)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x40)), Lsn::INVALID, &ctx)
.await?,
test_img("foo blk 0 at 3")
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x40)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x40)), Lsn::INVALID, &ctx)
.await?,
test_img("foo blk 1 at 4")
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x50)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x50)), Lsn::INVALID, &ctx)
.await?,
test_img("foo blk 0 at 3")
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x50)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x50)), Lsn::INVALID, &ctx)
.await?,
test_img("foo blk 1 at 4")
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), Lsn::INVALID, &ctx)
.await?,
test_img("foo blk 2 at 5")
);
@@ -1817,19 +1817,19 @@ mod tests {
// Check reported size and contents after truncation
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), Lsn::INVALID, &ctx)
.await?,
2
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x60)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x60)), Lsn::INVALID, &ctx)
.await?,
test_img("foo blk 0 at 3")
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x60)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x60)), Lsn::INVALID, &ctx)
.await?,
test_img("foo blk 1 at 4")
);
@@ -1837,13 +1837,13 @@ mod tests {
// should still see the truncated block with older LSN
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), Lsn::INVALID, &ctx)
.await?,
3
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), Lsn::INVALID, &ctx)
.await?,
test_img("foo blk 2 at 5")
);
@@ -1856,7 +1856,7 @@ mod tests {
m.commit(&ctx).await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x68)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x68)), Lsn::INVALID, &ctx)
.await?,
0
);
@@ -1869,19 +1869,19 @@ mod tests {
m.commit(&ctx).await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x70)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x70)), Lsn::INVALID, &ctx)
.await?,
2
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x70)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x70)), Lsn::INVALID, &ctx)
.await?,
ZERO_PAGE
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x70)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x70)), Lsn::INVALID, &ctx)
.await?,
test_img("foo blk 1")
);
@@ -1894,21 +1894,27 @@ mod tests {
m.commit(&ctx).await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), Lsn::INVALID, &ctx)
.await?,
1501
);
for blk in 2..1500 {
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, blk, Version::Lsn(Lsn(0x80)), false, &ctx)
.get_rel_page_at_lsn(
TESTREL_A,
blk,
Version::Lsn(Lsn(0x80)),
Lsn::INVALID,
&ctx
)
.await?,
ZERO_PAGE
);
}
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 1500, Version::Lsn(Lsn(0x80)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 1500, Version::Lsn(Lsn(0x80)), Lsn::INVALID, &ctx)
.await?,
test_img("foo blk 1500")
);
@@ -1935,13 +1941,13 @@ mod tests {
// Check that rel exists and size is correct
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), Lsn::INVALID, &ctx)
.await?,
true
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), Lsn::INVALID, &ctx)
.await?,
1
);
@@ -1954,7 +1960,7 @@ mod tests {
// Check that rel is not visible anymore
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x30)), false, &ctx)
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x30)), Lsn::INVALID, &ctx)
.await?,
false
);
@@ -1972,13 +1978,13 @@ mod tests {
// Check that rel exists and size is correct
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x40)), false, &ctx)
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x40)), Lsn::INVALID, &ctx)
.await?,
true
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x40)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x40)), Lsn::INVALID, &ctx)
.await?,
1
);
@@ -2011,24 +2017,24 @@ mod tests {
// The relation was created at LSN 20, not visible at LSN 1 yet.
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx)
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), Lsn::INVALID, &ctx)
.await?,
false
);
assert!(tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), Lsn::INVALID, &ctx)
.await
.is_err());
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), Lsn::INVALID, &ctx)
.await?,
true
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), Lsn::INVALID, &ctx)
.await?,
relsize
);
@@ -2039,7 +2045,7 @@ mod tests {
let data = format!("foo blk {} at {}", blkno, lsn);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(lsn), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(lsn), Lsn::INVALID, &ctx)
.await?,
test_img(&data)
);
@@ -2056,7 +2062,7 @@ mod tests {
// Check reported size and contents after truncation
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), Lsn::INVALID, &ctx)
.await?,
1
);
@@ -2066,7 +2072,13 @@ mod tests {
let data = format!("foo blk {} at {}", blkno, lsn);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x60)), false, &ctx)
.get_rel_page_at_lsn(
TESTREL_A,
blkno,
Version::Lsn(Lsn(0x60)),
Lsn::INVALID,
&ctx
)
.await?,
test_img(&data)
);
@@ -2075,7 +2087,7 @@ mod tests {
// should still see all blocks with older LSN
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), Lsn::INVALID, &ctx)
.await?,
relsize
);
@@ -2084,7 +2096,13 @@ mod tests {
let data = format!("foo blk {} at {}", blkno, lsn);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x50)), false, &ctx)
.get_rel_page_at_lsn(
TESTREL_A,
blkno,
Version::Lsn(Lsn(0x50)),
Lsn::INVALID,
&ctx
)
.await?,
test_img(&data)
);
@@ -2104,13 +2122,13 @@ mod tests {
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x80)), false, &ctx)
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x80)), Lsn::INVALID, &ctx)
.await?,
true
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), Lsn::INVALID, &ctx)
.await?,
relsize
);
@@ -2120,7 +2138,13 @@ mod tests {
let data = format!("foo blk {} at {}", blkno, lsn);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x80)), false, &ctx)
.get_rel_page_at_lsn(
TESTREL_A,
blkno,
Version::Lsn(Lsn(0x80)),
Lsn::INVALID,
&ctx
)
.await?,
test_img(&data)
);
@@ -2154,7 +2178,7 @@ mod tests {
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), Lsn::INVALID, &ctx)
.await?,
RELSEG_SIZE + 1
);
@@ -2168,7 +2192,7 @@ mod tests {
m.commit(&ctx).await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), Lsn::INVALID, &ctx)
.await?,
RELSEG_SIZE
);
@@ -2183,7 +2207,7 @@ mod tests {
m.commit(&ctx).await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), Lsn::INVALID, &ctx)
.await?,
RELSEG_SIZE - 1
);
@@ -2201,7 +2225,7 @@ mod tests {
m.commit(&ctx).await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), Lsn::INVALID, &ctx)
.await?,
size as BlockNumber
);

View File

@@ -33,9 +33,10 @@ typedef enum
/* pagestore_client -> pagestore */
T_NeonExistsRequest = 0,
T_NeonNblocksRequest,
T_NeonGetPageRequest,
T_NeonGetLatestPageRequest, /* old format of get_page command */
T_NeonDbSizeRequest,
T_NeonGetSlruSegmentRequest,
T_NeonGetPageRequest,
/* pagestore -> pagestore_client */
T_NeonExistsResponse = 100,
@@ -79,7 +80,7 @@ typedef enum {
typedef struct
{
NeonMessageTag tag;
bool latest; /* if true, request latest page version */
XLogRecPtr horizon; /* uppe boundary for page LSN */
XLogRecPtr lsn; /* request page version @ this LSN */
} NeonRequest;

View File

@@ -110,6 +110,20 @@ static UnloggedBuildPhase unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
static bool neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id);
static bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id) = NULL;
#define MAX_LSN ((XLogRecPtr)~0)
/*
* There are three kinds of get_page :
* 1. Master compute: get the latest page not older than specified LSN (horizon=Lsn::MAX)
* 2. RO replica: get the latest page not newer than current WAL position replica already applied (horizon=GetXLogReplayRecPtr(NULL))
* 3. Snapshot: get latest page not new than specified LSN (horizon=request_lsn)
*/
static XLogRecPtr
neon_get_horizon(bool latest)
{
return latest ? MAX_LSN : RecoveryInProgress() ? GetXLogReplayRecPtr(NULL) : InvalidXLogRecPtr; /* horizon=InvalidXlogRecPtr is replaced with request_lsn at PS */
}
/*
* Prefetch implementation:
*
@@ -687,9 +701,10 @@ static void
prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force_lsn)
{
bool found;
bool latest;
NeonGetPageRequest request = {
.req.tag = T_NeonGetPageRequest,
.req.latest = false,
.req.horizon = 0,
.req.lsn = 0,
.rinfo = BufTagGetNRelFileInfo(slot->buftag),
.forknum = slot->buftag.forkNum,
@@ -699,13 +714,13 @@ prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force
if (force_lsn && force_latest)
{
request.req.lsn = *force_lsn;
request.req.latest = *force_latest;
latest = *force_latest;
slot->actual_request_lsn = slot->effective_request_lsn = *force_lsn;
}
else
{
XLogRecPtr lsn = neon_get_request_lsn(
&request.req.latest,
&latest,
BufTagGetNRelFileInfo(slot->buftag),
slot->buftag.forkNum,
slot->buftag.blockNum
@@ -733,6 +748,7 @@ prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force
prefetch_lsn = Max(prefetch_lsn, lsn);
slot->effective_request_lsn = prefetch_lsn;
}
request.req.horizon = neon_get_horizon(latest);
Assert(slot->response == NULL);
Assert(slot->my_ring_index == MyPState->ring_unused);
@@ -1006,7 +1022,7 @@ nm_pack_request(NeonRequest *msg)
{
NeonExistsRequest *msg_req = (NeonExistsRequest *) msg;
pq_sendbyte(&s, msg_req->req.latest);
pq_sendint64(&s, msg_req->req.horizon);
pq_sendint64(&s, msg_req->req.lsn);
pq_sendint32(&s, NInfoGetSpcOid(msg_req->rinfo));
pq_sendint32(&s, NInfoGetDbOid(msg_req->rinfo));
@@ -1019,7 +1035,7 @@ nm_pack_request(NeonRequest *msg)
{
NeonNblocksRequest *msg_req = (NeonNblocksRequest *) msg;
pq_sendbyte(&s, msg_req->req.latest);
pq_sendint64(&s, msg_req->req.horizon);
pq_sendint64(&s, msg_req->req.lsn);
pq_sendint32(&s, NInfoGetSpcOid(msg_req->rinfo));
pq_sendint32(&s, NInfoGetDbOid(msg_req->rinfo));
@@ -1032,7 +1048,7 @@ nm_pack_request(NeonRequest *msg)
{
NeonDbSizeRequest *msg_req = (NeonDbSizeRequest *) msg;
pq_sendbyte(&s, msg_req->req.latest);
pq_sendint64(&s, msg_req->req.horizon);
pq_sendint64(&s, msg_req->req.lsn);
pq_sendint32(&s, msg_req->dbNode);
@@ -1042,7 +1058,7 @@ nm_pack_request(NeonRequest *msg)
{
NeonGetPageRequest *msg_req = (NeonGetPageRequest *) msg;
pq_sendbyte(&s, msg_req->req.latest);
pq_sendint64(&s, msg_req->req.horizon);
pq_sendint64(&s, msg_req->req.lsn);
pq_sendint32(&s, NInfoGetSpcOid(msg_req->rinfo));
pq_sendint32(&s, NInfoGetDbOid(msg_req->rinfo));
@@ -1057,7 +1073,7 @@ nm_pack_request(NeonRequest *msg)
{
NeonGetSlruSegmentRequest *msg_req = (NeonGetSlruSegmentRequest *) msg;
pq_sendbyte(&s, msg_req->req.latest);
pq_sendint64(&s, msg_req->req.horizon);
pq_sendint64(&s, msg_req->req.lsn);
pq_sendbyte(&s, msg_req->kind);
pq_sendint32(&s, msg_req->segno);
@@ -1209,7 +1225,7 @@ nm_to_string(NeonMessage *msg)
appendStringInfo(&s, ", \"rinfo\": \"%u/%u/%u\"", RelFileInfoFmt(msg_req->rinfo));
appendStringInfo(&s, ", \"forknum\": %d", msg_req->forknum);
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
appendStringInfo(&s, ", \"horizon\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.horizon));
appendStringInfoChar(&s, '}');
break;
}
@@ -1222,7 +1238,7 @@ nm_to_string(NeonMessage *msg)
appendStringInfo(&s, ", \"rinfo\": \"%u/%u/%u\"", RelFileInfoFmt(msg_req->rinfo));
appendStringInfo(&s, ", \"forknum\": %d", msg_req->forknum);
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
appendStringInfo(&s, ", \"horizon\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.horizon));
appendStringInfoChar(&s, '}');
break;
}
@@ -1236,7 +1252,7 @@ nm_to_string(NeonMessage *msg)
appendStringInfo(&s, ", \"forknum\": %d", msg_req->forknum);
appendStringInfo(&s, ", \"blkno\": %u", msg_req->blkno);
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
appendStringInfo(&s, ", \"horizon\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.horizon));
appendStringInfoChar(&s, '}');
break;
}
@@ -1247,7 +1263,7 @@ nm_to_string(NeonMessage *msg)
appendStringInfoString(&s, "{\"type\": \"NeonDbSizeRequest\"");
appendStringInfo(&s, ", \"dbnode\": \"%u\"", msg_req->dbNode);
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
appendStringInfo(&s, ", \"horizon\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.horizon));
appendStringInfoChar(&s, '}');
break;
}
@@ -1259,7 +1275,7 @@ nm_to_string(NeonMessage *msg)
appendStringInfo(&s, ", \"kind\": %u", msg_req->kind);
appendStringInfo(&s, ", \"segno\": %u", msg_req->segno);
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
appendStringInfo(&s, ", \"horizon\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.horizon));
appendStringInfoChar(&s, '}');
break;
}
@@ -1664,7 +1680,7 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
{
NeonExistsRequest request = {
.req.tag = T_NeonExistsRequest,
.req.latest = latest,
.req.horizon = neon_get_horizon(latest),
.req.lsn = request_lsn,
.rinfo = InfoFromSMgrRel(reln),
.forknum = forkNum};
@@ -2474,7 +2490,7 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
{
NeonNblocksRequest request = {
.req.tag = T_NeonNblocksRequest,
.req.latest = latest,
.req.horizon = neon_get_horizon(latest),
.req.lsn = request_lsn,
.rinfo = InfoFromSMgrRel(reln),
.forknum = forknum,
@@ -2531,7 +2547,7 @@ neon_dbsize(Oid dbNode)
{
NeonDbSizeRequest request = {
.req.tag = T_NeonDbSizeRequest,
.req.latest = latest,
.req.horizon = neon_get_horizon(latest),
.req.lsn = request_lsn,
.dbNode = dbNode,
};
@@ -2827,7 +2843,7 @@ neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buf
NeonResponse *resp;
NeonGetSlruSegmentRequest request = {
.req.tag = T_NeonGetSlruSegmentRequest,
.req.latest = false,
.req.horizon = InvalidXLogRecPtr,
.req.lsn = request_lsn,
.kind = kind,
@@ -2980,7 +2996,7 @@ neon_extend_rel_size(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
NeonNblocksRequest request = {
.req = (NeonRequest) {
.lsn = end_recptr,
.latest = false,
.horizon = neon_get_horizon(false),
.tag = T_NeonNblocksRequest,
},
.rinfo = rinfo,

View File

@@ -7,7 +7,9 @@ use std::{
io::BufReader,
};
use pageserver_api::models::{PagestreamFeMessage, PagestreamGetPageRequest};
use pageserver_api::models::{
PagestreamFeMessage, PagestreamGetLatestPageRequest, PagestreamGetPageRequest,
};
use utils::id::{ConnectionId, TenantId, TimelineId};
use clap::{Parser, Subcommand};
@@ -51,9 +53,11 @@ enum Command {
// - detect any prefetching anomalies by looking for negative deltas during seqscan
fn analyze_trace<R: std::io::Read>(mut reader: R) {
let mut total = 0; // Total requests traced
let mut old = 0; // Old requests traced
let mut cross_rel = 0; // Requests that ask for different rel than previous request
let mut deltas = HashMap::<i32, u32>::new(); // Consecutive blkno differences
let mut prev: Option<PagestreamGetPageRequest> = None;
let mut old_prev: Option<PagestreamGetLatestPageRequest> = None;
// Compute stats
while let Ok(msg) = PagestreamFeMessage::parse(&mut reader) {
@@ -61,6 +65,20 @@ fn analyze_trace<R: std::io::Read>(mut reader: R) {
PagestreamFeMessage::Exists(_) => {}
PagestreamFeMessage::Nblocks(_) => {}
PagestreamFeMessage::GetSlruSegment(_) => {}
PagestreamFeMessage::GetLatestPage(req) => {
total += 1;
old += 1;
if let Some(prev) = old_prev {
if prev.rel == req.rel {
let delta = (req.blkno as i32) - (prev.blkno as i32);
deltas.entry(delta).and_modify(|c| *c += 1).or_insert(1);
} else {
cross_rel += 1;
}
}
old_prev = Some(req);
}
PagestreamFeMessage::GetPage(req) => {
total += 1;
@@ -83,6 +101,7 @@ fn analyze_trace<R: std::io::Read>(mut reader: R) {
deltas.retain(|_, count| *count > 300);
other -= deltas.len();
dbg!(total);
dbg!(old);
dbg!(cross_rel);
dbg!(other);
dbg!(deltas);