mirror of
https://github.com/neondatabase/neon.git
synced 2026-03-05 09:20:38 +00:00
Compare commits
8 Commits
conrad/pro
...
getpage_ls
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
de649f856c | ||
|
|
de3fdf9860 | ||
|
|
1b2cfc0259 | ||
|
|
165a1d7bf1 | ||
|
|
f07c33186a | ||
|
|
15c0e1351a | ||
|
|
ccbf95e9dc | ||
|
|
93e6046005 |
@@ -841,21 +841,21 @@ impl TryFrom<u8> for PagestreamBeMessageTag {
|
|||||||
|
|
||||||
#[derive(Debug, PartialEq, Eq)]
|
#[derive(Debug, PartialEq, Eq)]
|
||||||
pub struct PagestreamExistsRequest {
|
pub struct PagestreamExistsRequest {
|
||||||
pub latest: bool,
|
pub horizon: Lsn,
|
||||||
pub lsn: Lsn,
|
pub lsn: Lsn,
|
||||||
pub rel: RelTag,
|
pub rel: RelTag,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, PartialEq, Eq)]
|
#[derive(Debug, PartialEq, Eq)]
|
||||||
pub struct PagestreamNblocksRequest {
|
pub struct PagestreamNblocksRequest {
|
||||||
pub latest: bool,
|
pub horizon: Lsn,
|
||||||
pub lsn: Lsn,
|
pub lsn: Lsn,
|
||||||
pub rel: RelTag,
|
pub rel: RelTag,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, PartialEq, Eq)]
|
#[derive(Debug, PartialEq, Eq)]
|
||||||
pub struct PagestreamGetPageRequest {
|
pub struct PagestreamGetPageRequest {
|
||||||
pub latest: bool,
|
pub horizon: Lsn,
|
||||||
pub lsn: Lsn,
|
pub lsn: Lsn,
|
||||||
pub rel: RelTag,
|
pub rel: RelTag,
|
||||||
pub blkno: u32,
|
pub blkno: u32,
|
||||||
@@ -863,14 +863,14 @@ pub struct PagestreamGetPageRequest {
|
|||||||
|
|
||||||
#[derive(Debug, PartialEq, Eq)]
|
#[derive(Debug, PartialEq, Eq)]
|
||||||
pub struct PagestreamDbSizeRequest {
|
pub struct PagestreamDbSizeRequest {
|
||||||
pub latest: bool,
|
pub horizon: Lsn,
|
||||||
pub lsn: Lsn,
|
pub lsn: Lsn,
|
||||||
pub dbnode: u32,
|
pub dbnode: u32,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, PartialEq, Eq)]
|
#[derive(Debug, PartialEq, Eq)]
|
||||||
pub struct PagestreamGetSlruSegmentRequest {
|
pub struct PagestreamGetSlruSegmentRequest {
|
||||||
pub latest: bool,
|
pub horizon: Lsn,
|
||||||
pub lsn: Lsn,
|
pub lsn: Lsn,
|
||||||
pub kind: u8,
|
pub kind: u8,
|
||||||
pub segno: u32,
|
pub segno: u32,
|
||||||
@@ -923,8 +923,8 @@ impl PagestreamFeMessage {
|
|||||||
|
|
||||||
match self {
|
match self {
|
||||||
Self::Exists(req) => {
|
Self::Exists(req) => {
|
||||||
bytes.put_u8(0);
|
bytes.put_u8(10);
|
||||||
bytes.put_u8(u8::from(req.latest));
|
bytes.put_u64(req.horizon.0);
|
||||||
bytes.put_u64(req.lsn.0);
|
bytes.put_u64(req.lsn.0);
|
||||||
bytes.put_u32(req.rel.spcnode);
|
bytes.put_u32(req.rel.spcnode);
|
||||||
bytes.put_u32(req.rel.dbnode);
|
bytes.put_u32(req.rel.dbnode);
|
||||||
@@ -933,8 +933,8 @@ impl PagestreamFeMessage {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Self::Nblocks(req) => {
|
Self::Nblocks(req) => {
|
||||||
bytes.put_u8(1);
|
bytes.put_u8(11);
|
||||||
bytes.put_u8(u8::from(req.latest));
|
bytes.put_u64(req.horizon.0);
|
||||||
bytes.put_u64(req.lsn.0);
|
bytes.put_u64(req.lsn.0);
|
||||||
bytes.put_u32(req.rel.spcnode);
|
bytes.put_u32(req.rel.spcnode);
|
||||||
bytes.put_u32(req.rel.dbnode);
|
bytes.put_u32(req.rel.dbnode);
|
||||||
@@ -943,8 +943,8 @@ impl PagestreamFeMessage {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Self::GetPage(req) => {
|
Self::GetPage(req) => {
|
||||||
bytes.put_u8(2);
|
bytes.put_u8(12);
|
||||||
bytes.put_u8(u8::from(req.latest));
|
bytes.put_u64(req.horizon.0);
|
||||||
bytes.put_u64(req.lsn.0);
|
bytes.put_u64(req.lsn.0);
|
||||||
bytes.put_u32(req.rel.spcnode);
|
bytes.put_u32(req.rel.spcnode);
|
||||||
bytes.put_u32(req.rel.dbnode);
|
bytes.put_u32(req.rel.dbnode);
|
||||||
@@ -954,15 +954,15 @@ impl PagestreamFeMessage {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Self::DbSize(req) => {
|
Self::DbSize(req) => {
|
||||||
bytes.put_u8(3);
|
bytes.put_u8(13);
|
||||||
bytes.put_u8(u8::from(req.latest));
|
bytes.put_u64(req.horizon.0);
|
||||||
bytes.put_u64(req.lsn.0);
|
bytes.put_u64(req.lsn.0);
|
||||||
bytes.put_u32(req.dbnode);
|
bytes.put_u32(req.dbnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
Self::GetSlruSegment(req) => {
|
Self::GetSlruSegment(req) => {
|
||||||
bytes.put_u8(4);
|
bytes.put_u8(14);
|
||||||
bytes.put_u8(u8::from(req.latest));
|
bytes.put_u64(req.horizon.0);
|
||||||
bytes.put_u64(req.lsn.0);
|
bytes.put_u64(req.lsn.0);
|
||||||
bytes.put_u8(req.kind);
|
bytes.put_u8(req.kind);
|
||||||
bytes.put_u32(req.segno);
|
bytes.put_u32(req.segno);
|
||||||
@@ -979,11 +979,32 @@ impl PagestreamFeMessage {
|
|||||||
//
|
//
|
||||||
// TODO: consider using protobuf or serde bincode for less error prone
|
// TODO: consider using protobuf or serde bincode for less error prone
|
||||||
// serialization.
|
// serialization.
|
||||||
let msg_tag = body.read_u8()?;
|
let mut msg_tag = body.read_u8()?;
|
||||||
|
//
|
||||||
|
// Old version of protocol use commands with tags started with 0 and containing `latest` flag.
|
||||||
|
// New version of protocol shift command tags by 10 and pass LSN range instead of `latest` flag.
|
||||||
|
// Server should be able to handle both protocol version. As far as we are not passing no=w,
|
||||||
|
// protocol version from client to server, we make a decision based on tag range.
|
||||||
|
// So this code actually provides backward compatibility.
|
||||||
|
//
|
||||||
|
let horizon = if msg_tag >= 10 {
|
||||||
|
// new protocol
|
||||||
|
msg_tag -= 10; // commands tags in new protocol starts with 10
|
||||||
|
Lsn::from(body.read_u64::<BigEndian>()?)
|
||||||
|
} else {
|
||||||
|
// old_protocol
|
||||||
|
let latest = body.read_u8()? != 0;
|
||||||
|
if latest {
|
||||||
|
Lsn::MAX // get latest version
|
||||||
|
} else {
|
||||||
|
Lsn::INVALID // get version on specified LSN
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let lsn = Lsn::from(body.read_u64::<BigEndian>()?);
|
||||||
match msg_tag {
|
match msg_tag {
|
||||||
0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
|
0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
|
||||||
latest: body.read_u8()? != 0,
|
horizon,
|
||||||
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
|
lsn,
|
||||||
rel: RelTag {
|
rel: RelTag {
|
||||||
spcnode: body.read_u32::<BigEndian>()?,
|
spcnode: body.read_u32::<BigEndian>()?,
|
||||||
dbnode: body.read_u32::<BigEndian>()?,
|
dbnode: body.read_u32::<BigEndian>()?,
|
||||||
@@ -992,8 +1013,8 @@ impl PagestreamFeMessage {
|
|||||||
},
|
},
|
||||||
})),
|
})),
|
||||||
1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
|
1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
|
||||||
latest: body.read_u8()? != 0,
|
horizon,
|
||||||
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
|
lsn,
|
||||||
rel: RelTag {
|
rel: RelTag {
|
||||||
spcnode: body.read_u32::<BigEndian>()?,
|
spcnode: body.read_u32::<BigEndian>()?,
|
||||||
dbnode: body.read_u32::<BigEndian>()?,
|
dbnode: body.read_u32::<BigEndian>()?,
|
||||||
@@ -1002,8 +1023,8 @@ impl PagestreamFeMessage {
|
|||||||
},
|
},
|
||||||
})),
|
})),
|
||||||
2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
|
2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
|
||||||
latest: body.read_u8()? != 0,
|
horizon,
|
||||||
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
|
lsn,
|
||||||
rel: RelTag {
|
rel: RelTag {
|
||||||
spcnode: body.read_u32::<BigEndian>()?,
|
spcnode: body.read_u32::<BigEndian>()?,
|
||||||
dbnode: body.read_u32::<BigEndian>()?,
|
dbnode: body.read_u32::<BigEndian>()?,
|
||||||
@@ -1013,14 +1034,14 @@ impl PagestreamFeMessage {
|
|||||||
blkno: body.read_u32::<BigEndian>()?,
|
blkno: body.read_u32::<BigEndian>()?,
|
||||||
})),
|
})),
|
||||||
3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
|
3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
|
||||||
latest: body.read_u8()? != 0,
|
horizon,
|
||||||
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
|
lsn,
|
||||||
dbnode: body.read_u32::<BigEndian>()?,
|
dbnode: body.read_u32::<BigEndian>()?,
|
||||||
})),
|
})),
|
||||||
4 => Ok(PagestreamFeMessage::GetSlruSegment(
|
4 => Ok(PagestreamFeMessage::GetSlruSegment(
|
||||||
PagestreamGetSlruSegmentRequest {
|
PagestreamGetSlruSegmentRequest {
|
||||||
latest: body.read_u8()? != 0,
|
horizon,
|
||||||
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
|
lsn,
|
||||||
kind: body.read_u8()?,
|
kind: body.read_u8()?,
|
||||||
segno: body.read_u32::<BigEndian>()?,
|
segno: body.read_u32::<BigEndian>()?,
|
||||||
},
|
},
|
||||||
@@ -1148,7 +1169,7 @@ mod tests {
|
|||||||
// Test serialization/deserialization of PagestreamFeMessage
|
// Test serialization/deserialization of PagestreamFeMessage
|
||||||
let messages = vec![
|
let messages = vec![
|
||||||
PagestreamFeMessage::Exists(PagestreamExistsRequest {
|
PagestreamFeMessage::Exists(PagestreamExistsRequest {
|
||||||
latest: true,
|
horizon: Lsn::MAX,
|
||||||
lsn: Lsn(4),
|
lsn: Lsn(4),
|
||||||
rel: RelTag {
|
rel: RelTag {
|
||||||
forknum: 1,
|
forknum: 1,
|
||||||
@@ -1158,7 +1179,7 @@ mod tests {
|
|||||||
},
|
},
|
||||||
}),
|
}),
|
||||||
PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
|
PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
|
||||||
latest: false,
|
horizon: Lsn::INVALID,
|
||||||
lsn: Lsn(4),
|
lsn: Lsn(4),
|
||||||
rel: RelTag {
|
rel: RelTag {
|
||||||
forknum: 1,
|
forknum: 1,
|
||||||
@@ -1168,8 +1189,8 @@ mod tests {
|
|||||||
},
|
},
|
||||||
}),
|
}),
|
||||||
PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
|
PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
|
||||||
latest: true,
|
horizon: Lsn::MAX,
|
||||||
lsn: Lsn(4),
|
lsn: Lsn::INVALID,
|
||||||
rel: RelTag {
|
rel: RelTag {
|
||||||
forknum: 1,
|
forknum: 1,
|
||||||
spcnode: 2,
|
spcnode: 2,
|
||||||
@@ -1179,7 +1200,7 @@ mod tests {
|
|||||||
blkno: 7,
|
blkno: 7,
|
||||||
}),
|
}),
|
||||||
PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
|
PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
|
||||||
latest: true,
|
horizon: Lsn::MAX,
|
||||||
lsn: Lsn(4),
|
lsn: Lsn(4),
|
||||||
dbnode: 7,
|
dbnode: 7,
|
||||||
}),
|
}),
|
||||||
|
|||||||
@@ -312,7 +312,11 @@ async fn main_impl(
|
|||||||
let (rel_tag, block_no) =
|
let (rel_tag, block_no) =
|
||||||
key_to_rel_block(key).expect("we filter non-rel-block keys out above");
|
key_to_rel_block(key).expect("we filter non-rel-block keys out above");
|
||||||
PagestreamGetPageRequest {
|
PagestreamGetPageRequest {
|
||||||
latest: rng.gen_bool(args.req_latest_probability),
|
horizon: if rng.gen_bool(args.req_latest_probability) {
|
||||||
|
Lsn::MAX
|
||||||
|
} else {
|
||||||
|
r.timeline_lsn
|
||||||
|
},
|
||||||
lsn: r.timeline_lsn,
|
lsn: r.timeline_lsn,
|
||||||
rel: rel_tag,
|
rel: rel_tag,
|
||||||
blkno: block_no,
|
blkno: block_no,
|
||||||
|
|||||||
@@ -361,9 +361,10 @@ where
|
|||||||
|
|
||||||
/// Add contents of relfilenode `src`, naming it as `dst`.
|
/// Add contents of relfilenode `src`, naming it as `dst`.
|
||||||
async fn add_rel(&mut self, src: RelTag, dst: RelTag) -> anyhow::Result<()> {
|
async fn add_rel(&mut self, src: RelTag, dst: RelTag) -> anyhow::Result<()> {
|
||||||
|
let horizon = self.lsn; // we do not need latest version
|
||||||
let nblocks = self
|
let nblocks = self
|
||||||
.timeline
|
.timeline
|
||||||
.get_rel_size(src, Version::Lsn(self.lsn), false, self.ctx)
|
.get_rel_size(src, Version::Lsn(self.lsn), horizon, self.ctx)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
// If the relation is empty, create an empty file
|
// If the relation is empty, create an empty file
|
||||||
@@ -384,7 +385,7 @@ where
|
|||||||
for blknum in startblk..endblk {
|
for blknum in startblk..endblk {
|
||||||
let img = self
|
let img = self
|
||||||
.timeline
|
.timeline
|
||||||
.get_rel_page_at_lsn(src, blknum, Version::Lsn(self.lsn), false, self.ctx)
|
.get_rel_page_at_lsn(src, blknum, Version::Lsn(self.lsn), horizon, self.ctx)
|
||||||
.await?;
|
.await?;
|
||||||
segment_data.extend_from_slice(&img[..]);
|
segment_data.extend_from_slice(&img[..]);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -847,69 +847,66 @@ impl PageServerHandler {
|
|||||||
/// In either case, if the page server hasn't received the WAL up to the
|
/// In either case, if the page server hasn't received the WAL up to the
|
||||||
/// requested LSN yet, we will wait for it to arrive. The return value is
|
/// requested LSN yet, we will wait for it to arrive. The return value is
|
||||||
/// the LSN that should be used to look up the page versions.
|
/// the LSN that should be used to look up the page versions.
|
||||||
|
///
|
||||||
|
/// Compute needs to specify:
|
||||||
|
/// 1. "desired" LSN - which LSN compute expects to be acceptable
|
||||||
|
/// 2. Upper boundary LSN - PS should not send page with greater LSN to preserver consistency
|
||||||
|
///
|
||||||
|
/// In case of primary node then upper boundary is always +inf: nobody except this node can produce more recent version of the page.
|
||||||
|
/// In case of replica it is not true: replica can lag from primary node and PS and should not receive pages newer than its last_replay_lsn.
|
||||||
|
/// But it is not good always to request pages at `last_replay_lsn` because replica can be ahead PS and so it has to wait
|
||||||
|
/// until PS caught up (while for this particular page it is not needed).
|
||||||
|
///
|
||||||
|
/// We actually need to handle just three cases:
|
||||||
|
/// \[page_last_written_lsn, +inf\] - primary node
|
||||||
|
/// \[page_last_written_lsn, last_replay_lsn\] - hot-standby replica (receiving WAL from primary)
|
||||||
|
/// \[snapshot_lsn, snapshot_lsn\] - static RO replica (not receiving WAL fro primary)
|
||||||
|
///
|
||||||
|
/// Case \[0, lsn\] is not actually needed and added mostly for convenience as alias for \[lsn,lsn\]
|
||||||
|
|
||||||
async fn wait_or_get_last_lsn(
|
async fn wait_or_get_last_lsn(
|
||||||
timeline: &Timeline,
|
timeline: &Timeline,
|
||||||
mut lsn: Lsn,
|
lsn: Lsn,
|
||||||
latest: bool,
|
horizon: Lsn,
|
||||||
latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
|
latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
|
||||||
ctx: &RequestContext,
|
ctx: &RequestContext,
|
||||||
) -> Result<Lsn, PageStreamError> {
|
) -> Result<Lsn, PageStreamError> {
|
||||||
if latest {
|
let last_record_lsn = timeline.get_last_record_lsn();
|
||||||
// Latest page version was requested. If LSN is given, it is a hint
|
// Horizon = 0 (INVALID) is treated as LSN interval degenerated to point [lsn,lsn].
|
||||||
// to the page server that there have been no modifications to the
|
// It as done mostly for convenience (because such get_page commands are widely used in tests) and
|
||||||
// page after that LSN. If we haven't received WAL up to that point,
|
// also seems to be logical: Lsn::MAX moves upper boundary of LSN interval till last_record_lsn and
|
||||||
// wait until it arrives.
|
// Lsn(0) moves upper boundary to lower boundary.
|
||||||
let last_record_lsn = timeline.get_last_record_lsn();
|
let request_horizon = if horizon == Lsn::INVALID {
|
||||||
|
lsn
|
||||||
// Note: this covers the special case that lsn == Lsn(0). That
|
|
||||||
// special case means "return the latest version whatever it is",
|
|
||||||
// and it's used for bootstrapping purposes, when the page server is
|
|
||||||
// connected directly to the compute node. That is needed because
|
|
||||||
// when you connect to the compute node, to receive the WAL, the
|
|
||||||
// walsender process will do a look up in the pg_authid catalog
|
|
||||||
// table for authentication. That poses a deadlock problem: the
|
|
||||||
// catalog table lookup will send a GetPage request, but the GetPage
|
|
||||||
// request will block in the page server because the recent WAL
|
|
||||||
// hasn't been received yet, and it cannot be received until the
|
|
||||||
// walsender completes the authentication and starts streaming the
|
|
||||||
// WAL.
|
|
||||||
if lsn <= last_record_lsn {
|
|
||||||
lsn = last_record_lsn;
|
|
||||||
} else {
|
|
||||||
timeline
|
|
||||||
.wait_lsn(
|
|
||||||
lsn,
|
|
||||||
crate::tenant::timeline::WaitLsnWaiter::PageService,
|
|
||||||
ctx,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
// Since we waited for 'lsn' to arrive, that is now the last
|
|
||||||
// record LSN. (Or close enough for our purposes; the
|
|
||||||
// last-record LSN can advance immediately after we return
|
|
||||||
// anyway)
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
if lsn == Lsn(0) {
|
horizon
|
||||||
return Err(PageStreamError::BadRequest(
|
};
|
||||||
"invalid LSN(0) in request".into(),
|
let effective_lsn = Lsn::max(lsn, Lsn::min(request_horizon, last_record_lsn));
|
||||||
));
|
if effective_lsn > last_record_lsn {
|
||||||
}
|
|
||||||
timeline
|
timeline
|
||||||
.wait_lsn(
|
.wait_lsn(
|
||||||
lsn,
|
effective_lsn,
|
||||||
crate::tenant::timeline::WaitLsnWaiter::PageService,
|
crate::tenant::timeline::WaitLsnWaiter::PageService,
|
||||||
ctx,
|
ctx,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
// Since we waited for 'lsn' to arrive, that is now the last
|
||||||
|
// record LSN. (Or close enough for our purposes; the
|
||||||
|
// last-record LSN can advance immediately after we return
|
||||||
|
// anyway)
|
||||||
|
} else if effective_lsn == Lsn(0) {
|
||||||
|
return Err(PageStreamError::BadRequest(
|
||||||
|
"invalid LSN(0) in request".into(),
|
||||||
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
if lsn < **latest_gc_cutoff_lsn {
|
if effective_lsn < **latest_gc_cutoff_lsn {
|
||||||
return Err(PageStreamError::BadRequest(format!(
|
return Err(PageStreamError::BadRequest(format!(
|
||||||
"tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
|
"tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
|
||||||
lsn, **latest_gc_cutoff_lsn
|
effective_lsn, **latest_gc_cutoff_lsn
|
||||||
).into()));
|
).into()));
|
||||||
}
|
}
|
||||||
Ok(lsn)
|
Ok(effective_lsn)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[instrument(skip_all, fields(shard_id))]
|
#[instrument(skip_all, fields(shard_id))]
|
||||||
@@ -927,11 +924,11 @@ impl PageServerHandler {
|
|||||||
|
|
||||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||||
let lsn =
|
let lsn =
|
||||||
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
|
Self::wait_or_get_last_lsn(timeline, req.lsn, req.horizon, &latest_gc_cutoff_lsn, ctx)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let exists = timeline
|
let exists = timeline
|
||||||
.get_rel_exists(req.rel, Version::Lsn(lsn), req.latest, ctx)
|
.get_rel_exists(req.rel, Version::Lsn(lsn), req.horizon, ctx)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse {
|
Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse {
|
||||||
@@ -955,11 +952,11 @@ impl PageServerHandler {
|
|||||||
|
|
||||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||||
let lsn =
|
let lsn =
|
||||||
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
|
Self::wait_or_get_last_lsn(timeline, req.lsn, req.horizon, &latest_gc_cutoff_lsn, ctx)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let n_blocks = timeline
|
let n_blocks = timeline
|
||||||
.get_rel_size(req.rel, Version::Lsn(lsn), req.latest, ctx)
|
.get_rel_size(req.rel, Version::Lsn(lsn), req.horizon, ctx)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse {
|
Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse {
|
||||||
@@ -983,7 +980,7 @@ impl PageServerHandler {
|
|||||||
|
|
||||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||||
let lsn =
|
let lsn =
|
||||||
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
|
Self::wait_or_get_last_lsn(timeline, req.lsn, req.horizon, &latest_gc_cutoff_lsn, ctx)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let total_blocks = timeline
|
let total_blocks = timeline
|
||||||
@@ -991,7 +988,7 @@ impl PageServerHandler {
|
|||||||
DEFAULTTABLESPACE_OID,
|
DEFAULTTABLESPACE_OID,
|
||||||
req.dbnode,
|
req.dbnode,
|
||||||
Version::Lsn(lsn),
|
Version::Lsn(lsn),
|
||||||
req.latest,
|
req.horizon,
|
||||||
ctx,
|
ctx,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
@@ -1161,11 +1158,11 @@ impl PageServerHandler {
|
|||||||
|
|
||||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||||
let lsn =
|
let lsn =
|
||||||
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
|
Self::wait_or_get_last_lsn(timeline, req.lsn, req.horizon, &latest_gc_cutoff_lsn, ctx)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let page = timeline
|
let page = timeline
|
||||||
.get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), req.latest, ctx)
|
.get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), req.horizon, ctx)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
|
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
|
||||||
@@ -1189,7 +1186,7 @@ impl PageServerHandler {
|
|||||||
|
|
||||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||||
let lsn =
|
let lsn =
|
||||||
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
|
Self::wait_or_get_last_lsn(timeline, req.lsn, req.horizon, &latest_gc_cutoff_lsn, ctx)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let kind = SlruKind::from_repr(req.kind)
|
let kind = SlruKind::from_repr(req.kind)
|
||||||
|
|||||||
@@ -175,7 +175,7 @@ impl Timeline {
|
|||||||
tag: RelTag,
|
tag: RelTag,
|
||||||
blknum: BlockNumber,
|
blknum: BlockNumber,
|
||||||
version: Version<'_>,
|
version: Version<'_>,
|
||||||
latest: bool,
|
horizon: Lsn,
|
||||||
ctx: &RequestContext,
|
ctx: &RequestContext,
|
||||||
) -> Result<Bytes, PageReconstructError> {
|
) -> Result<Bytes, PageReconstructError> {
|
||||||
if tag.relnode == 0 {
|
if tag.relnode == 0 {
|
||||||
@@ -184,7 +184,7 @@ impl Timeline {
|
|||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
let nblocks = self.get_rel_size(tag, version, latest, ctx).await?;
|
let nblocks = self.get_rel_size(tag, version, horizon, ctx).await?;
|
||||||
if blknum >= nblocks {
|
if blknum >= nblocks {
|
||||||
debug!(
|
debug!(
|
||||||
"read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
|
"read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
|
||||||
@@ -206,7 +206,7 @@ impl Timeline {
|
|||||||
spcnode: Oid,
|
spcnode: Oid,
|
||||||
dbnode: Oid,
|
dbnode: Oid,
|
||||||
version: Version<'_>,
|
version: Version<'_>,
|
||||||
latest: bool,
|
horizon: Lsn,
|
||||||
ctx: &RequestContext,
|
ctx: &RequestContext,
|
||||||
) -> Result<usize, PageReconstructError> {
|
) -> Result<usize, PageReconstructError> {
|
||||||
let mut total_blocks = 0;
|
let mut total_blocks = 0;
|
||||||
@@ -214,7 +214,7 @@ impl Timeline {
|
|||||||
let rels = self.list_rels(spcnode, dbnode, version, ctx).await?;
|
let rels = self.list_rels(spcnode, dbnode, version, ctx).await?;
|
||||||
|
|
||||||
for rel in rels {
|
for rel in rels {
|
||||||
let n_blocks = self.get_rel_size(rel, version, latest, ctx).await?;
|
let n_blocks = self.get_rel_size(rel, version, horizon, ctx).await?;
|
||||||
total_blocks += n_blocks as usize;
|
total_blocks += n_blocks as usize;
|
||||||
}
|
}
|
||||||
Ok(total_blocks)
|
Ok(total_blocks)
|
||||||
@@ -225,7 +225,7 @@ impl Timeline {
|
|||||||
&self,
|
&self,
|
||||||
tag: RelTag,
|
tag: RelTag,
|
||||||
version: Version<'_>,
|
version: Version<'_>,
|
||||||
latest: bool,
|
horizon: Lsn,
|
||||||
ctx: &RequestContext,
|
ctx: &RequestContext,
|
||||||
) -> Result<BlockNumber, PageReconstructError> {
|
) -> Result<BlockNumber, PageReconstructError> {
|
||||||
if tag.relnode == 0 {
|
if tag.relnode == 0 {
|
||||||
@@ -239,7 +239,7 @@ impl Timeline {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
|
if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
|
||||||
&& !self.get_rel_exists(tag, version, latest, ctx).await?
|
&& !self.get_rel_exists(tag, version, horizon, ctx).await?
|
||||||
{
|
{
|
||||||
// FIXME: Postgres sometimes calls smgrcreate() to create
|
// FIXME: Postgres sometimes calls smgrcreate() to create
|
||||||
// FSM, and smgrnblocks() on it immediately afterwards,
|
// FSM, and smgrnblocks() on it immediately afterwards,
|
||||||
@@ -252,14 +252,8 @@ impl Timeline {
|
|||||||
let mut buf = version.get(self, key, ctx).await?;
|
let mut buf = version.get(self, key, ctx).await?;
|
||||||
let nblocks = buf.get_u32_le();
|
let nblocks = buf.get_u32_le();
|
||||||
|
|
||||||
if latest {
|
if horizon == Lsn::MAX {
|
||||||
// Update relation size cache only if "latest" flag is set.
|
// Update relation size cache only if latest version is requested.
|
||||||
// This flag is set by compute when it is working with most recent version of relation.
|
|
||||||
// Typically master compute node always set latest=true.
|
|
||||||
// Please notice, that even if compute node "by mistake" specifies old LSN but set
|
|
||||||
// latest=true, then it can not cause cache corruption, because with latest=true
|
|
||||||
// pageserver choose max(request_lsn, last_written_lsn) and so cached value will be
|
|
||||||
// associated with most recent value of LSN.
|
|
||||||
self.update_cached_rel_size(tag, version.get_lsn(), nblocks);
|
self.update_cached_rel_size(tag, version.get_lsn(), nblocks);
|
||||||
}
|
}
|
||||||
Ok(nblocks)
|
Ok(nblocks)
|
||||||
@@ -270,7 +264,7 @@ impl Timeline {
|
|||||||
&self,
|
&self,
|
||||||
tag: RelTag,
|
tag: RelTag,
|
||||||
version: Version<'_>,
|
version: Version<'_>,
|
||||||
_latest: bool,
|
_horizon: Lsn,
|
||||||
ctx: &RequestContext,
|
ctx: &RequestContext,
|
||||||
) -> Result<bool, PageReconstructError> {
|
) -> Result<bool, PageReconstructError> {
|
||||||
if tag.relnode == 0 {
|
if tag.relnode == 0 {
|
||||||
@@ -1088,7 +1082,7 @@ impl<'a> DatadirModification<'a> {
|
|||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let total_blocks = self
|
let total_blocks = self
|
||||||
.tline
|
.tline
|
||||||
.get_db_size(spcnode, dbnode, Version::Modified(self), true, ctx)
|
.get_db_size(spcnode, dbnode, Version::Modified(self), Lsn::MAX, ctx)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
// Remove entry from dbdir
|
// Remove entry from dbdir
|
||||||
@@ -1187,7 +1181,7 @@ impl<'a> DatadirModification<'a> {
|
|||||||
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
|
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
|
||||||
if self
|
if self
|
||||||
.tline
|
.tline
|
||||||
.get_rel_exists(rel, Version::Modified(self), true, ctx)
|
.get_rel_exists(rel, Version::Modified(self), Lsn::MAX, ctx)
|
||||||
.await?
|
.await?
|
||||||
{
|
{
|
||||||
let size_key = rel_size_to_key(rel);
|
let size_key = rel_size_to_key(rel);
|
||||||
|
|||||||
@@ -1034,7 +1034,7 @@ impl WalIngest {
|
|||||||
|
|
||||||
let nblocks = modification
|
let nblocks = modification
|
||||||
.tline
|
.tline
|
||||||
.get_rel_size(src_rel, Version::Modified(modification), true, ctx)
|
.get_rel_size(src_rel, Version::Modified(modification), Lsn::MAX, ctx)
|
||||||
.await?;
|
.await?;
|
||||||
let dst_rel = RelTag {
|
let dst_rel = RelTag {
|
||||||
spcnode: tablespace_id,
|
spcnode: tablespace_id,
|
||||||
@@ -1072,7 +1072,7 @@ impl WalIngest {
|
|||||||
src_rel,
|
src_rel,
|
||||||
blknum,
|
blknum,
|
||||||
Version::Modified(modification),
|
Version::Modified(modification),
|
||||||
true,
|
Lsn::MAX,
|
||||||
ctx,
|
ctx,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
@@ -1242,7 +1242,7 @@ impl WalIngest {
|
|||||||
};
|
};
|
||||||
if modification
|
if modification
|
||||||
.tline
|
.tline
|
||||||
.get_rel_exists(rel, Version::Modified(modification), true, ctx)
|
.get_rel_exists(rel, Version::Modified(modification), Lsn::MAX, ctx)
|
||||||
.await?
|
.await?
|
||||||
{
|
{
|
||||||
self.put_rel_drop(modification, rel, ctx).await?;
|
self.put_rel_drop(modification, rel, ctx).await?;
|
||||||
@@ -1541,7 +1541,7 @@ impl WalIngest {
|
|||||||
nblocks
|
nblocks
|
||||||
} else if !modification
|
} else if !modification
|
||||||
.tline
|
.tline
|
||||||
.get_rel_exists(rel, Version::Modified(modification), true, ctx)
|
.get_rel_exists(rel, Version::Modified(modification), Lsn::MAX, ctx)
|
||||||
.await?
|
.await?
|
||||||
{
|
{
|
||||||
// create it with 0 size initially, the logic below will extend it
|
// create it with 0 size initially, the logic below will extend it
|
||||||
@@ -1553,7 +1553,7 @@ impl WalIngest {
|
|||||||
} else {
|
} else {
|
||||||
modification
|
modification
|
||||||
.tline
|
.tline
|
||||||
.get_rel_size(rel, Version::Modified(modification), true, ctx)
|
.get_rel_size(rel, Version::Modified(modification), Lsn::MAX, ctx)
|
||||||
.await?
|
.await?
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -1650,14 +1650,14 @@ async fn get_relsize(
|
|||||||
) -> anyhow::Result<BlockNumber> {
|
) -> anyhow::Result<BlockNumber> {
|
||||||
let nblocks = if !modification
|
let nblocks = if !modification
|
||||||
.tline
|
.tline
|
||||||
.get_rel_exists(rel, Version::Modified(modification), true, ctx)
|
.get_rel_exists(rel, Version::Modified(modification), Lsn::MAX, ctx)
|
||||||
.await?
|
.await?
|
||||||
{
|
{
|
||||||
0
|
0
|
||||||
} else {
|
} else {
|
||||||
modification
|
modification
|
||||||
.tline
|
.tline
|
||||||
.get_rel_size(rel, Version::Modified(modification), true, ctx)
|
.get_rel_size(rel, Version::Modified(modification), Lsn::MAX, ctx)
|
||||||
.await?
|
.await?
|
||||||
};
|
};
|
||||||
Ok(nblocks)
|
Ok(nblocks)
|
||||||
@@ -1732,29 +1732,29 @@ mod tests {
|
|||||||
// The relation was created at LSN 2, not visible at LSN 1 yet.
|
// The relation was created at LSN 2, not visible at LSN 1 yet.
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
tline
|
tline
|
||||||
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx)
|
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), Lsn::INVALID, &ctx)
|
||||||
.await?,
|
.await?,
|
||||||
false
|
false
|
||||||
);
|
);
|
||||||
assert!(tline
|
assert!(tline
|
||||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx)
|
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), Lsn::INVALID, &ctx)
|
||||||
.await
|
.await
|
||||||
.is_err());
|
.is_err());
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
tline
|
tline
|
||||||
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
|
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), Lsn::INVALID, &ctx)
|
||||||
.await?,
|
.await?,
|
||||||
true
|
true
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
tline
|
tline
|
||||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
|
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), Lsn::INVALID, &ctx)
|
||||||
.await?,
|
.await?,
|
||||||
1
|
1
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
tline
|
tline
|
||||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), false, &ctx)
|
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), Lsn::INVALID, &ctx)
|
||||||
.await?,
|
.await?,
|
||||||
3
|
3
|
||||||
);
|
);
|
||||||
@@ -1762,46 +1762,46 @@ mod tests {
|
|||||||
// Check page contents at each LSN
|
// Check page contents at each LSN
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
tline
|
tline
|
||||||
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x20)), false, &ctx)
|
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x20)), Lsn::INVALID, &ctx)
|
||||||
.await?,
|
.await?,
|
||||||
test_img("foo blk 0 at 2")
|
test_img("foo blk 0 at 2")
|
||||||
);
|
);
|
||||||
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
tline
|
tline
|
||||||
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x30)), false, &ctx)
|
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x30)), Lsn::INVALID, &ctx)
|
||||||
.await?,
|
.await?,
|
||||||
test_img("foo blk 0 at 3")
|
test_img("foo blk 0 at 3")
|
||||||
);
|
);
|
||||||
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
tline
|
tline
|
||||||
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x40)), false, &ctx)
|
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x40)), Lsn::INVALID, &ctx)
|
||||||
.await?,
|
.await?,
|
||||||
test_img("foo blk 0 at 3")
|
test_img("foo blk 0 at 3")
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
tline
|
tline
|
||||||
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x40)), false, &ctx)
|
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x40)), Lsn::INVALID, &ctx)
|
||||||
.await?,
|
.await?,
|
||||||
test_img("foo blk 1 at 4")
|
test_img("foo blk 1 at 4")
|
||||||
);
|
);
|
||||||
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
tline
|
tline
|
||||||
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x50)), false, &ctx)
|
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x50)), Lsn::INVALID, &ctx)
|
||||||
.await?,
|
.await?,
|
||||||
test_img("foo blk 0 at 3")
|
test_img("foo blk 0 at 3")
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
tline
|
tline
|
||||||
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x50)), false, &ctx)
|
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x50)), Lsn::INVALID, &ctx)
|
||||||
.await?,
|
.await?,
|
||||||
test_img("foo blk 1 at 4")
|
test_img("foo blk 1 at 4")
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
tline
|
tline
|
||||||
.get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), false, &ctx)
|
.get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), Lsn::INVALID, &ctx)
|
||||||
.await?,
|
.await?,
|
||||||
test_img("foo blk 2 at 5")
|
test_img("foo blk 2 at 5")
|
||||||
);
|
);
|
||||||
@@ -1817,19 +1817,19 @@ mod tests {
|
|||||||
// Check reported size and contents after truncation
|
// Check reported size and contents after truncation
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
tline
|
tline
|
||||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), false, &ctx)
|
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), Lsn::INVALID, &ctx)
|
||||||
.await?,
|
.await?,
|
||||||
2
|
2
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
tline
|
tline
|
||||||
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x60)), false, &ctx)
|
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x60)), Lsn::INVALID, &ctx)
|
||||||
.await?,
|
.await?,
|
||||||
test_img("foo blk 0 at 3")
|
test_img("foo blk 0 at 3")
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
tline
|
tline
|
||||||
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x60)), false, &ctx)
|
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x60)), Lsn::INVALID, &ctx)
|
||||||
.await?,
|
.await?,
|
||||||
test_img("foo blk 1 at 4")
|
test_img("foo blk 1 at 4")
|
||||||
);
|
);
|
||||||
@@ -1837,13 +1837,13 @@ mod tests {
|
|||||||
// should still see the truncated block with older LSN
|
// should still see the truncated block with older LSN
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
tline
|
tline
|
||||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), false, &ctx)
|
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), Lsn::INVALID, &ctx)
|
||||||
.await?,
|
.await?,
|
||||||
3
|
3
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
tline
|
tline
|
||||||
.get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), false, &ctx)
|
.get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), Lsn::INVALID, &ctx)
|
||||||
.await?,
|
.await?,
|
||||||
test_img("foo blk 2 at 5")
|
test_img("foo blk 2 at 5")
|
||||||
);
|
);
|
||||||
@@ -1856,7 +1856,7 @@ mod tests {
|
|||||||
m.commit(&ctx).await?;
|
m.commit(&ctx).await?;
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
tline
|
tline
|
||||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x68)), false, &ctx)
|
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x68)), Lsn::INVALID, &ctx)
|
||||||
.await?,
|
.await?,
|
||||||
0
|
0
|
||||||
);
|
);
|
||||||
@@ -1869,19 +1869,19 @@ mod tests {
|
|||||||
m.commit(&ctx).await?;
|
m.commit(&ctx).await?;
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
tline
|
tline
|
||||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x70)), false, &ctx)
|
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x70)), Lsn::INVALID, &ctx)
|
||||||
.await?,
|
.await?,
|
||||||
2
|
2
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
tline
|
tline
|
||||||
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x70)), false, &ctx)
|
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x70)), Lsn::INVALID, &ctx)
|
||||||
.await?,
|
.await?,
|
||||||
ZERO_PAGE
|
ZERO_PAGE
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
tline
|
tline
|
||||||
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x70)), false, &ctx)
|
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x70)), Lsn::INVALID, &ctx)
|
||||||
.await?,
|
.await?,
|
||||||
test_img("foo blk 1")
|
test_img("foo blk 1")
|
||||||
);
|
);
|
||||||
@@ -1894,21 +1894,27 @@ mod tests {
|
|||||||
m.commit(&ctx).await?;
|
m.commit(&ctx).await?;
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
tline
|
tline
|
||||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), false, &ctx)
|
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), Lsn::INVALID, &ctx)
|
||||||
.await?,
|
.await?,
|
||||||
1501
|
1501
|
||||||
);
|
);
|
||||||
for blk in 2..1500 {
|
for blk in 2..1500 {
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
tline
|
tline
|
||||||
.get_rel_page_at_lsn(TESTREL_A, blk, Version::Lsn(Lsn(0x80)), false, &ctx)
|
.get_rel_page_at_lsn(
|
||||||
|
TESTREL_A,
|
||||||
|
blk,
|
||||||
|
Version::Lsn(Lsn(0x80)),
|
||||||
|
Lsn::INVALID,
|
||||||
|
&ctx
|
||||||
|
)
|
||||||
.await?,
|
.await?,
|
||||||
ZERO_PAGE
|
ZERO_PAGE
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
tline
|
tline
|
||||||
.get_rel_page_at_lsn(TESTREL_A, 1500, Version::Lsn(Lsn(0x80)), false, &ctx)
|
.get_rel_page_at_lsn(TESTREL_A, 1500, Version::Lsn(Lsn(0x80)), Lsn::INVALID, &ctx)
|
||||||
.await?,
|
.await?,
|
||||||
test_img("foo blk 1500")
|
test_img("foo blk 1500")
|
||||||
);
|
);
|
||||||
@@ -1935,13 +1941,13 @@ mod tests {
|
|||||||
// Check that rel exists and size is correct
|
// Check that rel exists and size is correct
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
tline
|
tline
|
||||||
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
|
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), Lsn::INVALID, &ctx)
|
||||||
.await?,
|
.await?,
|
||||||
true
|
true
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
tline
|
tline
|
||||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
|
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), Lsn::INVALID, &ctx)
|
||||||
.await?,
|
.await?,
|
||||||
1
|
1
|
||||||
);
|
);
|
||||||
@@ -1954,7 +1960,7 @@ mod tests {
|
|||||||
// Check that rel is not visible anymore
|
// Check that rel is not visible anymore
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
tline
|
tline
|
||||||
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x30)), false, &ctx)
|
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x30)), Lsn::INVALID, &ctx)
|
||||||
.await?,
|
.await?,
|
||||||
false
|
false
|
||||||
);
|
);
|
||||||
@@ -1972,13 +1978,13 @@ mod tests {
|
|||||||
// Check that rel exists and size is correct
|
// Check that rel exists and size is correct
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
tline
|
tline
|
||||||
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x40)), false, &ctx)
|
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x40)), Lsn::INVALID, &ctx)
|
||||||
.await?,
|
.await?,
|
||||||
true
|
true
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
tline
|
tline
|
||||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x40)), false, &ctx)
|
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x40)), Lsn::INVALID, &ctx)
|
||||||
.await?,
|
.await?,
|
||||||
1
|
1
|
||||||
);
|
);
|
||||||
@@ -2011,24 +2017,24 @@ mod tests {
|
|||||||
// The relation was created at LSN 20, not visible at LSN 1 yet.
|
// The relation was created at LSN 20, not visible at LSN 1 yet.
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
tline
|
tline
|
||||||
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx)
|
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), Lsn::INVALID, &ctx)
|
||||||
.await?,
|
.await?,
|
||||||
false
|
false
|
||||||
);
|
);
|
||||||
assert!(tline
|
assert!(tline
|
||||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx)
|
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), Lsn::INVALID, &ctx)
|
||||||
.await
|
.await
|
||||||
.is_err());
|
.is_err());
|
||||||
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
tline
|
tline
|
||||||
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
|
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), Lsn::INVALID, &ctx)
|
||||||
.await?,
|
.await?,
|
||||||
true
|
true
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
tline
|
tline
|
||||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
|
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), Lsn::INVALID, &ctx)
|
||||||
.await?,
|
.await?,
|
||||||
relsize
|
relsize
|
||||||
);
|
);
|
||||||
@@ -2039,7 +2045,7 @@ mod tests {
|
|||||||
let data = format!("foo blk {} at {}", blkno, lsn);
|
let data = format!("foo blk {} at {}", blkno, lsn);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
tline
|
tline
|
||||||
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(lsn), false, &ctx)
|
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(lsn), Lsn::INVALID, &ctx)
|
||||||
.await?,
|
.await?,
|
||||||
test_img(&data)
|
test_img(&data)
|
||||||
);
|
);
|
||||||
@@ -2056,7 +2062,7 @@ mod tests {
|
|||||||
// Check reported size and contents after truncation
|
// Check reported size and contents after truncation
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
tline
|
tline
|
||||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), false, &ctx)
|
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), Lsn::INVALID, &ctx)
|
||||||
.await?,
|
.await?,
|
||||||
1
|
1
|
||||||
);
|
);
|
||||||
@@ -2066,7 +2072,13 @@ mod tests {
|
|||||||
let data = format!("foo blk {} at {}", blkno, lsn);
|
let data = format!("foo blk {} at {}", blkno, lsn);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
tline
|
tline
|
||||||
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x60)), false, &ctx)
|
.get_rel_page_at_lsn(
|
||||||
|
TESTREL_A,
|
||||||
|
blkno,
|
||||||
|
Version::Lsn(Lsn(0x60)),
|
||||||
|
Lsn::INVALID,
|
||||||
|
&ctx
|
||||||
|
)
|
||||||
.await?,
|
.await?,
|
||||||
test_img(&data)
|
test_img(&data)
|
||||||
);
|
);
|
||||||
@@ -2075,7 +2087,7 @@ mod tests {
|
|||||||
// should still see all blocks with older LSN
|
// should still see all blocks with older LSN
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
tline
|
tline
|
||||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), false, &ctx)
|
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), Lsn::INVALID, &ctx)
|
||||||
.await?,
|
.await?,
|
||||||
relsize
|
relsize
|
||||||
);
|
);
|
||||||
@@ -2084,7 +2096,13 @@ mod tests {
|
|||||||
let data = format!("foo blk {} at {}", blkno, lsn);
|
let data = format!("foo blk {} at {}", blkno, lsn);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
tline
|
tline
|
||||||
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x50)), false, &ctx)
|
.get_rel_page_at_lsn(
|
||||||
|
TESTREL_A,
|
||||||
|
blkno,
|
||||||
|
Version::Lsn(Lsn(0x50)),
|
||||||
|
Lsn::INVALID,
|
||||||
|
&ctx
|
||||||
|
)
|
||||||
.await?,
|
.await?,
|
||||||
test_img(&data)
|
test_img(&data)
|
||||||
);
|
);
|
||||||
@@ -2104,13 +2122,13 @@ mod tests {
|
|||||||
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
tline
|
tline
|
||||||
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x80)), false, &ctx)
|
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x80)), Lsn::INVALID, &ctx)
|
||||||
.await?,
|
.await?,
|
||||||
true
|
true
|
||||||
);
|
);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
tline
|
tline
|
||||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), false, &ctx)
|
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), Lsn::INVALID, &ctx)
|
||||||
.await?,
|
.await?,
|
||||||
relsize
|
relsize
|
||||||
);
|
);
|
||||||
@@ -2120,7 +2138,13 @@ mod tests {
|
|||||||
let data = format!("foo blk {} at {}", blkno, lsn);
|
let data = format!("foo blk {} at {}", blkno, lsn);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
tline
|
tline
|
||||||
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x80)), false, &ctx)
|
.get_rel_page_at_lsn(
|
||||||
|
TESTREL_A,
|
||||||
|
blkno,
|
||||||
|
Version::Lsn(Lsn(0x80)),
|
||||||
|
Lsn::INVALID,
|
||||||
|
&ctx
|
||||||
|
)
|
||||||
.await?,
|
.await?,
|
||||||
test_img(&data)
|
test_img(&data)
|
||||||
);
|
);
|
||||||
@@ -2154,7 +2178,7 @@ mod tests {
|
|||||||
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
tline
|
tline
|
||||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx)
|
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), Lsn::INVALID, &ctx)
|
||||||
.await?,
|
.await?,
|
||||||
RELSEG_SIZE + 1
|
RELSEG_SIZE + 1
|
||||||
);
|
);
|
||||||
@@ -2168,7 +2192,7 @@ mod tests {
|
|||||||
m.commit(&ctx).await?;
|
m.commit(&ctx).await?;
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
tline
|
tline
|
||||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx)
|
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), Lsn::INVALID, &ctx)
|
||||||
.await?,
|
.await?,
|
||||||
RELSEG_SIZE
|
RELSEG_SIZE
|
||||||
);
|
);
|
||||||
@@ -2183,7 +2207,7 @@ mod tests {
|
|||||||
m.commit(&ctx).await?;
|
m.commit(&ctx).await?;
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
tline
|
tline
|
||||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx)
|
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), Lsn::INVALID, &ctx)
|
||||||
.await?,
|
.await?,
|
||||||
RELSEG_SIZE - 1
|
RELSEG_SIZE - 1
|
||||||
);
|
);
|
||||||
@@ -2201,7 +2225,7 @@ mod tests {
|
|||||||
m.commit(&ctx).await?;
|
m.commit(&ctx).await?;
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
tline
|
tline
|
||||||
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx)
|
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), Lsn::INVALID, &ctx)
|
||||||
.await?,
|
.await?,
|
||||||
size as BlockNumber
|
size as BlockNumber
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -49,6 +49,8 @@ char *neon_auth_token;
|
|||||||
int readahead_buffer_size = 128;
|
int readahead_buffer_size = 128;
|
||||||
int flush_every_n_requests = 8;
|
int flush_every_n_requests = 8;
|
||||||
|
|
||||||
|
int neon_protocol_version;
|
||||||
|
|
||||||
static int n_reconnect_attempts = 0;
|
static int n_reconnect_attempts = 0;
|
||||||
static int max_reconnect_attempts = 60;
|
static int max_reconnect_attempts = 60;
|
||||||
static int stripe_size;
|
static int stripe_size;
|
||||||
@@ -844,6 +846,14 @@ pg_init_libpagestore(void)
|
|||||||
PGC_USERSET,
|
PGC_USERSET,
|
||||||
0, /* no flags required */
|
0, /* no flags required */
|
||||||
NULL, (GucIntAssignHook) &readahead_buffer_resize, NULL);
|
NULL, (GucIntAssignHook) &readahead_buffer_resize, NULL);
|
||||||
|
DefineCustomIntVariable("neon.protocol_version",
|
||||||
|
"Version of compute<->page server protocol",
|
||||||
|
NULL,
|
||||||
|
&neon_protocol_version,
|
||||||
|
NEON_PROTOCOL_VERSION, 1, INT_MAX,
|
||||||
|
PGC_USERSET,
|
||||||
|
0, /* no flags required */
|
||||||
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
relsize_hash_init();
|
relsize_hash_init();
|
||||||
|
|
||||||
|
|||||||
@@ -28,10 +28,17 @@
|
|||||||
#define MAX_SHARDS 128
|
#define MAX_SHARDS 128
|
||||||
#define MAX_PAGESERVER_CONNSTRING_SIZE 256
|
#define MAX_PAGESERVER_CONNSTRING_SIZE 256
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Right now protocal version is not set to the server.
|
||||||
|
* So it is ciritical that format of existed commands is not changed.
|
||||||
|
* New protocl versions can just add new commands.
|
||||||
|
*/
|
||||||
|
#define NEON_PROTOCOL_VERSION 2
|
||||||
|
|
||||||
typedef enum
|
typedef enum
|
||||||
{
|
{
|
||||||
/* pagestore_client -> pagestore */
|
/* pagestore_client -> pagestore */
|
||||||
T_NeonExistsRequest = 0,
|
T_NeonExistsRequest = 10, /* new protocol message tags start from 10 */
|
||||||
T_NeonNblocksRequest,
|
T_NeonNblocksRequest,
|
||||||
T_NeonGetPageRequest,
|
T_NeonGetPageRequest,
|
||||||
T_NeonDbSizeRequest,
|
T_NeonDbSizeRequest,
|
||||||
@@ -72,14 +79,20 @@ typedef enum {
|
|||||||
/*
|
/*
|
||||||
* supertype of all the Neon*Request structs below
|
* supertype of all the Neon*Request structs below
|
||||||
*
|
*
|
||||||
* If 'latest' is true, we are requesting the latest page version, and 'lsn'
|
* In old version of Neon we have 'latest' flag indicating that we are requesting the latest page version, and 'lsn'
|
||||||
* is just a hint to the server that we know there are no versions of the page
|
* is just a hint to the server that we know there are no versions of the page
|
||||||
* (or relation size, for exists/nblocks requests) later than the 'lsn'.
|
* (or relation size, for exists/nblocks requests) later than the 'lsn'.
|
||||||
|
*
|
||||||
|
* But it doesn't work for hot-standby replica because it may be not at the latest LSN position.
|
||||||
|
* So we need to be able to specify upper boundary for LSN which page server can send to us.
|
||||||
|
* This is why 'latest' flag is replaced with 'horizon'. MAX_LSN=~0 value of 'horizon' means that we are requesting latest version.
|
||||||
|
* If we need version on exact LSN (for static RO replicas), 'horizon' should be set to 0: in this case range [lsn,lsn] is used by page server.
|
||||||
|
* Otherwise for hot-standby replica we specify in 'horizon' current replay position.
|
||||||
*/
|
*/
|
||||||
typedef struct
|
typedef struct
|
||||||
{
|
{
|
||||||
NeonMessageTag tag;
|
NeonMessageTag tag;
|
||||||
bool latest; /* if true, request latest page version */
|
XLogRecPtr horizon; /* upper boundary for page LSN */
|
||||||
XLogRecPtr lsn; /* request page version @ this LSN */
|
XLogRecPtr lsn; /* request page version @ this LSN */
|
||||||
} NeonRequest;
|
} NeonRequest;
|
||||||
|
|
||||||
@@ -193,6 +206,7 @@ extern int readahead_buffer_size;
|
|||||||
extern char *neon_timeline;
|
extern char *neon_timeline;
|
||||||
extern char *neon_tenant;
|
extern char *neon_tenant;
|
||||||
extern int32 max_cluster_size;
|
extern int32 max_cluster_size;
|
||||||
|
extern int neon_protocol_version;
|
||||||
|
|
||||||
extern shardno_t get_shard_number(BufferTag* tag);
|
extern shardno_t get_shard_number(BufferTag* tag);
|
||||||
|
|
||||||
|
|||||||
@@ -110,6 +110,20 @@ static UnloggedBuildPhase unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
|
|||||||
static bool neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id);
|
static bool neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id);
|
||||||
static bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id) = NULL;
|
static bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id) = NULL;
|
||||||
|
|
||||||
|
#define MAX_LSN ((XLogRecPtr)~0)
|
||||||
|
|
||||||
|
/*
|
||||||
|
* There are three kinds of get_page :
|
||||||
|
* 1. Master compute: get the latest page not older than specified LSN (horizon=Lsn::MAX)
|
||||||
|
* 2. RO replica: get the latest page not newer than current WAL position replica already applied (horizon=GetXLogReplayRecPtr(NULL))
|
||||||
|
* 3. Snapshot: get latest page not new than specified LSN (horizon=request_lsn)
|
||||||
|
*/
|
||||||
|
static XLogRecPtr
|
||||||
|
neon_get_horizon(bool latest)
|
||||||
|
{
|
||||||
|
return latest ? MAX_LSN : RecoveryInProgress() ? GetXLogReplayRecPtr(NULL) : InvalidXLogRecPtr; /* horizon=InvalidXlogRecPtr is replaced with request_lsn at PS */
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Prefetch implementation:
|
* Prefetch implementation:
|
||||||
*
|
*
|
||||||
@@ -687,9 +701,10 @@ static void
|
|||||||
prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force_lsn)
|
prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force_lsn)
|
||||||
{
|
{
|
||||||
bool found;
|
bool found;
|
||||||
|
bool latest;
|
||||||
NeonGetPageRequest request = {
|
NeonGetPageRequest request = {
|
||||||
.req.tag = T_NeonGetPageRequest,
|
.req.tag = T_NeonGetPageRequest,
|
||||||
.req.latest = false,
|
.req.horizon = 0,
|
||||||
.req.lsn = 0,
|
.req.lsn = 0,
|
||||||
.rinfo = BufTagGetNRelFileInfo(slot->buftag),
|
.rinfo = BufTagGetNRelFileInfo(slot->buftag),
|
||||||
.forknum = slot->buftag.forkNum,
|
.forknum = slot->buftag.forkNum,
|
||||||
@@ -699,13 +714,13 @@ prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force
|
|||||||
if (force_lsn && force_latest)
|
if (force_lsn && force_latest)
|
||||||
{
|
{
|
||||||
request.req.lsn = *force_lsn;
|
request.req.lsn = *force_lsn;
|
||||||
request.req.latest = *force_latest;
|
latest = *force_latest;
|
||||||
slot->actual_request_lsn = slot->effective_request_lsn = *force_lsn;
|
slot->actual_request_lsn = slot->effective_request_lsn = *force_lsn;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
XLogRecPtr lsn = neon_get_request_lsn(
|
XLogRecPtr lsn = neon_get_request_lsn(
|
||||||
&request.req.latest,
|
&latest,
|
||||||
BufTagGetNRelFileInfo(slot->buftag),
|
BufTagGetNRelFileInfo(slot->buftag),
|
||||||
slot->buftag.forkNum,
|
slot->buftag.forkNum,
|
||||||
slot->buftag.blockNum
|
slot->buftag.blockNum
|
||||||
@@ -733,6 +748,7 @@ prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force
|
|||||||
prefetch_lsn = Max(prefetch_lsn, lsn);
|
prefetch_lsn = Max(prefetch_lsn, lsn);
|
||||||
slot->effective_request_lsn = prefetch_lsn;
|
slot->effective_request_lsn = prefetch_lsn;
|
||||||
}
|
}
|
||||||
|
request.req.horizon = neon_get_horizon(latest);
|
||||||
|
|
||||||
Assert(slot->response == NULL);
|
Assert(slot->response == NULL);
|
||||||
Assert(slot->my_ring_index == MyPState->ring_unused);
|
Assert(slot->my_ring_index == MyPState->ring_unused);
|
||||||
@@ -997,7 +1013,19 @@ nm_pack_request(NeonRequest *msg)
|
|||||||
StringInfoData s;
|
StringInfoData s;
|
||||||
|
|
||||||
initStringInfo(&s);
|
initStringInfo(&s);
|
||||||
pq_sendbyte(&s, msg->tag);
|
|
||||||
|
if (neon_protocol_version >= 2)
|
||||||
|
{
|
||||||
|
pq_sendbyte(&s, msg->tag);
|
||||||
|
pq_sendint64(&s, msg->horizon);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/* Old protocol with latest flag */
|
||||||
|
pq_sendbyte(&s, msg->tag - T_NeonExistsRequest); /* old protocol command tags start from zero */
|
||||||
|
pq_sendbyte(&s, msg->horizon == MAX_LSN);
|
||||||
|
}
|
||||||
|
pq_sendint64(&s, msg->lsn);
|
||||||
|
|
||||||
switch (messageTag(msg))
|
switch (messageTag(msg))
|
||||||
{
|
{
|
||||||
@@ -1006,8 +1034,6 @@ nm_pack_request(NeonRequest *msg)
|
|||||||
{
|
{
|
||||||
NeonExistsRequest *msg_req = (NeonExistsRequest *) msg;
|
NeonExistsRequest *msg_req = (NeonExistsRequest *) msg;
|
||||||
|
|
||||||
pq_sendbyte(&s, msg_req->req.latest);
|
|
||||||
pq_sendint64(&s, msg_req->req.lsn);
|
|
||||||
pq_sendint32(&s, NInfoGetSpcOid(msg_req->rinfo));
|
pq_sendint32(&s, NInfoGetSpcOid(msg_req->rinfo));
|
||||||
pq_sendint32(&s, NInfoGetDbOid(msg_req->rinfo));
|
pq_sendint32(&s, NInfoGetDbOid(msg_req->rinfo));
|
||||||
pq_sendint32(&s, NInfoGetRelNumber(msg_req->rinfo));
|
pq_sendint32(&s, NInfoGetRelNumber(msg_req->rinfo));
|
||||||
@@ -1019,8 +1045,6 @@ nm_pack_request(NeonRequest *msg)
|
|||||||
{
|
{
|
||||||
NeonNblocksRequest *msg_req = (NeonNblocksRequest *) msg;
|
NeonNblocksRequest *msg_req = (NeonNblocksRequest *) msg;
|
||||||
|
|
||||||
pq_sendbyte(&s, msg_req->req.latest);
|
|
||||||
pq_sendint64(&s, msg_req->req.lsn);
|
|
||||||
pq_sendint32(&s, NInfoGetSpcOid(msg_req->rinfo));
|
pq_sendint32(&s, NInfoGetSpcOid(msg_req->rinfo));
|
||||||
pq_sendint32(&s, NInfoGetDbOid(msg_req->rinfo));
|
pq_sendint32(&s, NInfoGetDbOid(msg_req->rinfo));
|
||||||
pq_sendint32(&s, NInfoGetRelNumber(msg_req->rinfo));
|
pq_sendint32(&s, NInfoGetRelNumber(msg_req->rinfo));
|
||||||
@@ -1032,8 +1056,6 @@ nm_pack_request(NeonRequest *msg)
|
|||||||
{
|
{
|
||||||
NeonDbSizeRequest *msg_req = (NeonDbSizeRequest *) msg;
|
NeonDbSizeRequest *msg_req = (NeonDbSizeRequest *) msg;
|
||||||
|
|
||||||
pq_sendbyte(&s, msg_req->req.latest);
|
|
||||||
pq_sendint64(&s, msg_req->req.lsn);
|
|
||||||
pq_sendint32(&s, msg_req->dbNode);
|
pq_sendint32(&s, msg_req->dbNode);
|
||||||
|
|
||||||
break;
|
break;
|
||||||
@@ -1042,8 +1064,6 @@ nm_pack_request(NeonRequest *msg)
|
|||||||
{
|
{
|
||||||
NeonGetPageRequest *msg_req = (NeonGetPageRequest *) msg;
|
NeonGetPageRequest *msg_req = (NeonGetPageRequest *) msg;
|
||||||
|
|
||||||
pq_sendbyte(&s, msg_req->req.latest);
|
|
||||||
pq_sendint64(&s, msg_req->req.lsn);
|
|
||||||
pq_sendint32(&s, NInfoGetSpcOid(msg_req->rinfo));
|
pq_sendint32(&s, NInfoGetSpcOid(msg_req->rinfo));
|
||||||
pq_sendint32(&s, NInfoGetDbOid(msg_req->rinfo));
|
pq_sendint32(&s, NInfoGetDbOid(msg_req->rinfo));
|
||||||
pq_sendint32(&s, NInfoGetRelNumber(msg_req->rinfo));
|
pq_sendint32(&s, NInfoGetRelNumber(msg_req->rinfo));
|
||||||
@@ -1057,8 +1077,6 @@ nm_pack_request(NeonRequest *msg)
|
|||||||
{
|
{
|
||||||
NeonGetSlruSegmentRequest *msg_req = (NeonGetSlruSegmentRequest *) msg;
|
NeonGetSlruSegmentRequest *msg_req = (NeonGetSlruSegmentRequest *) msg;
|
||||||
|
|
||||||
pq_sendbyte(&s, msg_req->req.latest);
|
|
||||||
pq_sendint64(&s, msg_req->req.lsn);
|
|
||||||
pq_sendbyte(&s, msg_req->kind);
|
pq_sendbyte(&s, msg_req->kind);
|
||||||
pq_sendint32(&s, msg_req->segno);
|
pq_sendint32(&s, msg_req->segno);
|
||||||
|
|
||||||
@@ -1209,7 +1227,7 @@ nm_to_string(NeonMessage *msg)
|
|||||||
appendStringInfo(&s, ", \"rinfo\": \"%u/%u/%u\"", RelFileInfoFmt(msg_req->rinfo));
|
appendStringInfo(&s, ", \"rinfo\": \"%u/%u/%u\"", RelFileInfoFmt(msg_req->rinfo));
|
||||||
appendStringInfo(&s, ", \"forknum\": %d", msg_req->forknum);
|
appendStringInfo(&s, ", \"forknum\": %d", msg_req->forknum);
|
||||||
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
|
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
|
||||||
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
|
appendStringInfo(&s, ", \"horizon\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.horizon));
|
||||||
appendStringInfoChar(&s, '}');
|
appendStringInfoChar(&s, '}');
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@@ -1222,7 +1240,7 @@ nm_to_string(NeonMessage *msg)
|
|||||||
appendStringInfo(&s, ", \"rinfo\": \"%u/%u/%u\"", RelFileInfoFmt(msg_req->rinfo));
|
appendStringInfo(&s, ", \"rinfo\": \"%u/%u/%u\"", RelFileInfoFmt(msg_req->rinfo));
|
||||||
appendStringInfo(&s, ", \"forknum\": %d", msg_req->forknum);
|
appendStringInfo(&s, ", \"forknum\": %d", msg_req->forknum);
|
||||||
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
|
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
|
||||||
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
|
appendStringInfo(&s, ", \"horizon\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.horizon));
|
||||||
appendStringInfoChar(&s, '}');
|
appendStringInfoChar(&s, '}');
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@@ -1236,7 +1254,7 @@ nm_to_string(NeonMessage *msg)
|
|||||||
appendStringInfo(&s, ", \"forknum\": %d", msg_req->forknum);
|
appendStringInfo(&s, ", \"forknum\": %d", msg_req->forknum);
|
||||||
appendStringInfo(&s, ", \"blkno\": %u", msg_req->blkno);
|
appendStringInfo(&s, ", \"blkno\": %u", msg_req->blkno);
|
||||||
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
|
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
|
||||||
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
|
appendStringInfo(&s, ", \"horizon\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.horizon));
|
||||||
appendStringInfoChar(&s, '}');
|
appendStringInfoChar(&s, '}');
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@@ -1247,7 +1265,7 @@ nm_to_string(NeonMessage *msg)
|
|||||||
appendStringInfoString(&s, "{\"type\": \"NeonDbSizeRequest\"");
|
appendStringInfoString(&s, "{\"type\": \"NeonDbSizeRequest\"");
|
||||||
appendStringInfo(&s, ", \"dbnode\": \"%u\"", msg_req->dbNode);
|
appendStringInfo(&s, ", \"dbnode\": \"%u\"", msg_req->dbNode);
|
||||||
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
|
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
|
||||||
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
|
appendStringInfo(&s, ", \"horizon\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.horizon));
|
||||||
appendStringInfoChar(&s, '}');
|
appendStringInfoChar(&s, '}');
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@@ -1259,7 +1277,7 @@ nm_to_string(NeonMessage *msg)
|
|||||||
appendStringInfo(&s, ", \"kind\": %u", msg_req->kind);
|
appendStringInfo(&s, ", \"kind\": %u", msg_req->kind);
|
||||||
appendStringInfo(&s, ", \"segno\": %u", msg_req->segno);
|
appendStringInfo(&s, ", \"segno\": %u", msg_req->segno);
|
||||||
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
|
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
|
||||||
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
|
appendStringInfo(&s, ", \"horizon\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.horizon));
|
||||||
appendStringInfoChar(&s, '}');
|
appendStringInfoChar(&s, '}');
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@@ -1664,7 +1682,7 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
|
|||||||
{
|
{
|
||||||
NeonExistsRequest request = {
|
NeonExistsRequest request = {
|
||||||
.req.tag = T_NeonExistsRequest,
|
.req.tag = T_NeonExistsRequest,
|
||||||
.req.latest = latest,
|
.req.horizon = neon_get_horizon(latest),
|
||||||
.req.lsn = request_lsn,
|
.req.lsn = request_lsn,
|
||||||
.rinfo = InfoFromSMgrRel(reln),
|
.rinfo = InfoFromSMgrRel(reln),
|
||||||
.forknum = forkNum};
|
.forknum = forkNum};
|
||||||
@@ -2474,7 +2492,7 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
|
|||||||
{
|
{
|
||||||
NeonNblocksRequest request = {
|
NeonNblocksRequest request = {
|
||||||
.req.tag = T_NeonNblocksRequest,
|
.req.tag = T_NeonNblocksRequest,
|
||||||
.req.latest = latest,
|
.req.horizon = neon_get_horizon(latest),
|
||||||
.req.lsn = request_lsn,
|
.req.lsn = request_lsn,
|
||||||
.rinfo = InfoFromSMgrRel(reln),
|
.rinfo = InfoFromSMgrRel(reln),
|
||||||
.forknum = forknum,
|
.forknum = forknum,
|
||||||
@@ -2531,7 +2549,7 @@ neon_dbsize(Oid dbNode)
|
|||||||
{
|
{
|
||||||
NeonDbSizeRequest request = {
|
NeonDbSizeRequest request = {
|
||||||
.req.tag = T_NeonDbSizeRequest,
|
.req.tag = T_NeonDbSizeRequest,
|
||||||
.req.latest = latest,
|
.req.horizon = neon_get_horizon(latest),
|
||||||
.req.lsn = request_lsn,
|
.req.lsn = request_lsn,
|
||||||
.dbNode = dbNode,
|
.dbNode = dbNode,
|
||||||
};
|
};
|
||||||
@@ -2827,7 +2845,7 @@ neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buf
|
|||||||
NeonResponse *resp;
|
NeonResponse *resp;
|
||||||
NeonGetSlruSegmentRequest request = {
|
NeonGetSlruSegmentRequest request = {
|
||||||
.req.tag = T_NeonGetSlruSegmentRequest,
|
.req.tag = T_NeonGetSlruSegmentRequest,
|
||||||
.req.latest = false,
|
.req.horizon = InvalidXLogRecPtr,
|
||||||
.req.lsn = request_lsn,
|
.req.lsn = request_lsn,
|
||||||
|
|
||||||
.kind = kind,
|
.kind = kind,
|
||||||
@@ -2980,7 +2998,7 @@ neon_extend_rel_size(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
|||||||
NeonNblocksRequest request = {
|
NeonNblocksRequest request = {
|
||||||
.req = (NeonRequest) {
|
.req = (NeonRequest) {
|
||||||
.lsn = end_recptr,
|
.lsn = end_recptr,
|
||||||
.latest = false,
|
.horizon = neon_get_horizon(false),
|
||||||
.tag = T_NeonNblocksRequest,
|
.tag = T_NeonNblocksRequest,
|
||||||
},
|
},
|
||||||
.rinfo = rinfo,
|
.rinfo = rinfo,
|
||||||
|
|||||||
9
test_runner/regress/test_protocol_version.py
Normal file
9
test_runner/regress/test_protocol_version.py
Normal file
@@ -0,0 +1,9 @@
|
|||||||
|
from fixtures.neon_fixtures import NeonEnv
|
||||||
|
|
||||||
|
|
||||||
|
def test_protocol_version(neon_simple_env: NeonEnv):
|
||||||
|
env = neon_simple_env
|
||||||
|
endpoint = env.endpoints.create_start("main", config_lines=["neon.protocol_version=1"])
|
||||||
|
cur = endpoint.connect().cursor()
|
||||||
|
cur.execute("show neon.protocol_version")
|
||||||
|
assert cur.fetchone() == ("1",)
|
||||||
Reference in New Issue
Block a user