Compare commits

..

8 Commits

Author SHA1 Message Date
Konstantin Knizhnik
de649f856c Fix documentation format issues 2024-04-13 22:37:39 +03:00
Konstantin Knizhnik
de3fdf9860 Add more comments 2024-04-13 21:47:01 +03:00
Konstantin Knizhnik
1b2cfc0259 Proivide comment for NeonRequest struct 2024-04-11 17:24:39 +03:00
Konstantin Knizhnik
165a1d7bf1 Make ruff happy 2024-04-11 09:15:35 +03:00
Konstantin Knizhnik
f07c33186a Add neon.protocol_version GUC 2024-04-11 09:15:35 +03:00
Konstantin Knizhnik
15c0e1351a Fix messages tags in PS serialize 2024-04-11 09:15:35 +03:00
Konstantin Knizhnik
ccbf95e9dc Use tags starting from 10 for command of new protocol 2024-04-11 09:15:34 +03:00
Konstantin Knizhnik
93e6046005 Send LSN range in getpage request 2024-04-11 09:15:31 +03:00
19 changed files with 281 additions and 377 deletions

1
Cargo.lock generated
View File

@@ -4320,7 +4320,6 @@ dependencies = [
"hyper-util",
"ipnet",
"itertools",
"jsonwebtoken",
"lasso",
"md5",
"metrics",

View File

@@ -841,21 +841,21 @@ impl TryFrom<u8> for PagestreamBeMessageTag {
#[derive(Debug, PartialEq, Eq)]
pub struct PagestreamExistsRequest {
pub latest: bool,
pub horizon: Lsn,
pub lsn: Lsn,
pub rel: RelTag,
}
#[derive(Debug, PartialEq, Eq)]
pub struct PagestreamNblocksRequest {
pub latest: bool,
pub horizon: Lsn,
pub lsn: Lsn,
pub rel: RelTag,
}
#[derive(Debug, PartialEq, Eq)]
pub struct PagestreamGetPageRequest {
pub latest: bool,
pub horizon: Lsn,
pub lsn: Lsn,
pub rel: RelTag,
pub blkno: u32,
@@ -863,14 +863,14 @@ pub struct PagestreamGetPageRequest {
#[derive(Debug, PartialEq, Eq)]
pub struct PagestreamDbSizeRequest {
pub latest: bool,
pub horizon: Lsn,
pub lsn: Lsn,
pub dbnode: u32,
}
#[derive(Debug, PartialEq, Eq)]
pub struct PagestreamGetSlruSegmentRequest {
pub latest: bool,
pub horizon: Lsn,
pub lsn: Lsn,
pub kind: u8,
pub segno: u32,
@@ -923,8 +923,8 @@ impl PagestreamFeMessage {
match self {
Self::Exists(req) => {
bytes.put_u8(0);
bytes.put_u8(u8::from(req.latest));
bytes.put_u8(10);
bytes.put_u64(req.horizon.0);
bytes.put_u64(req.lsn.0);
bytes.put_u32(req.rel.spcnode);
bytes.put_u32(req.rel.dbnode);
@@ -933,8 +933,8 @@ impl PagestreamFeMessage {
}
Self::Nblocks(req) => {
bytes.put_u8(1);
bytes.put_u8(u8::from(req.latest));
bytes.put_u8(11);
bytes.put_u64(req.horizon.0);
bytes.put_u64(req.lsn.0);
bytes.put_u32(req.rel.spcnode);
bytes.put_u32(req.rel.dbnode);
@@ -943,8 +943,8 @@ impl PagestreamFeMessage {
}
Self::GetPage(req) => {
bytes.put_u8(2);
bytes.put_u8(u8::from(req.latest));
bytes.put_u8(12);
bytes.put_u64(req.horizon.0);
bytes.put_u64(req.lsn.0);
bytes.put_u32(req.rel.spcnode);
bytes.put_u32(req.rel.dbnode);
@@ -954,15 +954,15 @@ impl PagestreamFeMessage {
}
Self::DbSize(req) => {
bytes.put_u8(3);
bytes.put_u8(u8::from(req.latest));
bytes.put_u8(13);
bytes.put_u64(req.horizon.0);
bytes.put_u64(req.lsn.0);
bytes.put_u32(req.dbnode);
}
Self::GetSlruSegment(req) => {
bytes.put_u8(4);
bytes.put_u8(u8::from(req.latest));
bytes.put_u8(14);
bytes.put_u64(req.horizon.0);
bytes.put_u64(req.lsn.0);
bytes.put_u8(req.kind);
bytes.put_u32(req.segno);
@@ -979,11 +979,32 @@ impl PagestreamFeMessage {
//
// TODO: consider using protobuf or serde bincode for less error prone
// serialization.
let msg_tag = body.read_u8()?;
let mut msg_tag = body.read_u8()?;
//
// Old version of protocol use commands with tags started with 0 and containing `latest` flag.
// New version of protocol shift command tags by 10 and pass LSN range instead of `latest` flag.
// Server should be able to handle both protocol version. As far as we are not passing no=w,
// protocol version from client to server, we make a decision based on tag range.
// So this code actually provides backward compatibility.
//
let horizon = if msg_tag >= 10 {
// new protocol
msg_tag -= 10; // commands tags in new protocol starts with 10
Lsn::from(body.read_u64::<BigEndian>()?)
} else {
// old_protocol
let latest = body.read_u8()? != 0;
if latest {
Lsn::MAX // get latest version
} else {
Lsn::INVALID // get version on specified LSN
}
};
let lsn = Lsn::from(body.read_u64::<BigEndian>()?);
match msg_tag {
0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
horizon,
lsn,
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
@@ -992,8 +1013,8 @@ impl PagestreamFeMessage {
},
})),
1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
horizon,
lsn,
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
@@ -1002,8 +1023,8 @@ impl PagestreamFeMessage {
},
})),
2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
horizon,
lsn,
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
@@ -1013,14 +1034,14 @@ impl PagestreamFeMessage {
blkno: body.read_u32::<BigEndian>()?,
})),
3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
horizon,
lsn,
dbnode: body.read_u32::<BigEndian>()?,
})),
4 => Ok(PagestreamFeMessage::GetSlruSegment(
PagestreamGetSlruSegmentRequest {
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
horizon,
lsn,
kind: body.read_u8()?,
segno: body.read_u32::<BigEndian>()?,
},
@@ -1148,7 +1169,7 @@ mod tests {
// Test serialization/deserialization of PagestreamFeMessage
let messages = vec![
PagestreamFeMessage::Exists(PagestreamExistsRequest {
latest: true,
horizon: Lsn::MAX,
lsn: Lsn(4),
rel: RelTag {
forknum: 1,
@@ -1158,7 +1179,7 @@ mod tests {
},
}),
PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
latest: false,
horizon: Lsn::INVALID,
lsn: Lsn(4),
rel: RelTag {
forknum: 1,
@@ -1168,8 +1189,8 @@ mod tests {
},
}),
PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
latest: true,
lsn: Lsn(4),
horizon: Lsn::MAX,
lsn: Lsn::INVALID,
rel: RelTag {
forknum: 1,
spcnode: 2,
@@ -1179,7 +1200,7 @@ mod tests {
blkno: 7,
}),
PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
latest: true,
horizon: Lsn::MAX,
lsn: Lsn(4),
dbnode: 7,
}),

View File

@@ -312,7 +312,11 @@ async fn main_impl(
let (rel_tag, block_no) =
key_to_rel_block(key).expect("we filter non-rel-block keys out above");
PagestreamGetPageRequest {
latest: rng.gen_bool(args.req_latest_probability),
horizon: if rng.gen_bool(args.req_latest_probability) {
Lsn::MAX
} else {
r.timeline_lsn
},
lsn: r.timeline_lsn,
rel: rel_tag,
blkno: block_no,

View File

@@ -361,9 +361,10 @@ where
/// Add contents of relfilenode `src`, naming it as `dst`.
async fn add_rel(&mut self, src: RelTag, dst: RelTag) -> anyhow::Result<()> {
let horizon = self.lsn; // we do not need latest version
let nblocks = self
.timeline
.get_rel_size(src, Version::Lsn(self.lsn), false, self.ctx)
.get_rel_size(src, Version::Lsn(self.lsn), horizon, self.ctx)
.await?;
// If the relation is empty, create an empty file
@@ -384,7 +385,7 @@ where
for blknum in startblk..endblk {
let img = self
.timeline
.get_rel_page_at_lsn(src, blknum, Version::Lsn(self.lsn), false, self.ctx)
.get_rel_page_at_lsn(src, blknum, Version::Lsn(self.lsn), horizon, self.ctx)
.await?;
segment_data.extend_from_slice(&img[..]);
}

View File

@@ -847,69 +847,66 @@ impl PageServerHandler {
/// In either case, if the page server hasn't received the WAL up to the
/// requested LSN yet, we will wait for it to arrive. The return value is
/// the LSN that should be used to look up the page versions.
///
/// Compute needs to specify:
/// 1. "desired" LSN - which LSN compute expects to be acceptable
/// 2. Upper boundary LSN - PS should not send page with greater LSN to preserver consistency
///
/// In case of primary node then upper boundary is always +inf: nobody except this node can produce more recent version of the page.
/// In case of replica it is not true: replica can lag from primary node and PS and should not receive pages newer than its last_replay_lsn.
/// But it is not good always to request pages at `last_replay_lsn` because replica can be ahead PS and so it has to wait
/// until PS caught up (while for this particular page it is not needed).
///
/// We actually need to handle just three cases:
/// \[page_last_written_lsn, +inf\] - primary node
/// \[page_last_written_lsn, last_replay_lsn\] - hot-standby replica (receiving WAL from primary)
/// \[snapshot_lsn, snapshot_lsn\] - static RO replica (not receiving WAL fro primary)
///
/// Case \[0, lsn\] is not actually needed and added mostly for convenience as alias for \[lsn,lsn\]
async fn wait_or_get_last_lsn(
timeline: &Timeline,
mut lsn: Lsn,
latest: bool,
lsn: Lsn,
horizon: Lsn,
latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
ctx: &RequestContext,
) -> Result<Lsn, PageStreamError> {
if latest {
// Latest page version was requested. If LSN is given, it is a hint
// to the page server that there have been no modifications to the
// page after that LSN. If we haven't received WAL up to that point,
// wait until it arrives.
let last_record_lsn = timeline.get_last_record_lsn();
// Note: this covers the special case that lsn == Lsn(0). That
// special case means "return the latest version whatever it is",
// and it's used for bootstrapping purposes, when the page server is
// connected directly to the compute node. That is needed because
// when you connect to the compute node, to receive the WAL, the
// walsender process will do a look up in the pg_authid catalog
// table for authentication. That poses a deadlock problem: the
// catalog table lookup will send a GetPage request, but the GetPage
// request will block in the page server because the recent WAL
// hasn't been received yet, and it cannot be received until the
// walsender completes the authentication and starts streaming the
// WAL.
if lsn <= last_record_lsn {
lsn = last_record_lsn;
} else {
timeline
.wait_lsn(
lsn,
crate::tenant::timeline::WaitLsnWaiter::PageService,
ctx,
)
.await?;
// Since we waited for 'lsn' to arrive, that is now the last
// record LSN. (Or close enough for our purposes; the
// last-record LSN can advance immediately after we return
// anyway)
}
let last_record_lsn = timeline.get_last_record_lsn();
// Horizon = 0 (INVALID) is treated as LSN interval degenerated to point [lsn,lsn].
// It as done mostly for convenience (because such get_page commands are widely used in tests) and
// also seems to be logical: Lsn::MAX moves upper boundary of LSN interval till last_record_lsn and
// Lsn(0) moves upper boundary to lower boundary.
let request_horizon = if horizon == Lsn::INVALID {
lsn
} else {
if lsn == Lsn(0) {
return Err(PageStreamError::BadRequest(
"invalid LSN(0) in request".into(),
));
}
horizon
};
let effective_lsn = Lsn::max(lsn, Lsn::min(request_horizon, last_record_lsn));
if effective_lsn > last_record_lsn {
timeline
.wait_lsn(
lsn,
effective_lsn,
crate::tenant::timeline::WaitLsnWaiter::PageService,
ctx,
)
.await?;
// Since we waited for 'lsn' to arrive, that is now the last
// record LSN. (Or close enough for our purposes; the
// last-record LSN can advance immediately after we return
// anyway)
} else if effective_lsn == Lsn(0) {
return Err(PageStreamError::BadRequest(
"invalid LSN(0) in request".into(),
));
}
if lsn < **latest_gc_cutoff_lsn {
if effective_lsn < **latest_gc_cutoff_lsn {
return Err(PageStreamError::BadRequest(format!(
"tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
lsn, **latest_gc_cutoff_lsn
effective_lsn, **latest_gc_cutoff_lsn
).into()));
}
Ok(lsn)
Ok(effective_lsn)
}
#[instrument(skip_all, fields(shard_id))]
@@ -927,11 +924,11 @@ impl PageServerHandler {
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn =
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
Self::wait_or_get_last_lsn(timeline, req.lsn, req.horizon, &latest_gc_cutoff_lsn, ctx)
.await?;
let exists = timeline
.get_rel_exists(req.rel, Version::Lsn(lsn), req.latest, ctx)
.get_rel_exists(req.rel, Version::Lsn(lsn), req.horizon, ctx)
.await?;
Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse {
@@ -955,11 +952,11 @@ impl PageServerHandler {
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn =
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
Self::wait_or_get_last_lsn(timeline, req.lsn, req.horizon, &latest_gc_cutoff_lsn, ctx)
.await?;
let n_blocks = timeline
.get_rel_size(req.rel, Version::Lsn(lsn), req.latest, ctx)
.get_rel_size(req.rel, Version::Lsn(lsn), req.horizon, ctx)
.await?;
Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse {
@@ -983,7 +980,7 @@ impl PageServerHandler {
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn =
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
Self::wait_or_get_last_lsn(timeline, req.lsn, req.horizon, &latest_gc_cutoff_lsn, ctx)
.await?;
let total_blocks = timeline
@@ -991,7 +988,7 @@ impl PageServerHandler {
DEFAULTTABLESPACE_OID,
req.dbnode,
Version::Lsn(lsn),
req.latest,
req.horizon,
ctx,
)
.await?;
@@ -1161,11 +1158,11 @@ impl PageServerHandler {
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn =
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
Self::wait_or_get_last_lsn(timeline, req.lsn, req.horizon, &latest_gc_cutoff_lsn, ctx)
.await?;
let page = timeline
.get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), req.latest, ctx)
.get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), req.horizon, ctx)
.await?;
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
@@ -1189,7 +1186,7 @@ impl PageServerHandler {
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn =
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
Self::wait_or_get_last_lsn(timeline, req.lsn, req.horizon, &latest_gc_cutoff_lsn, ctx)
.await?;
let kind = SlruKind::from_repr(req.kind)

View File

@@ -175,7 +175,7 @@ impl Timeline {
tag: RelTag,
blknum: BlockNumber,
version: Version<'_>,
latest: bool,
horizon: Lsn,
ctx: &RequestContext,
) -> Result<Bytes, PageReconstructError> {
if tag.relnode == 0 {
@@ -184,7 +184,7 @@ impl Timeline {
));
}
let nblocks = self.get_rel_size(tag, version, latest, ctx).await?;
let nblocks = self.get_rel_size(tag, version, horizon, ctx).await?;
if blknum >= nblocks {
debug!(
"read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
@@ -206,7 +206,7 @@ impl Timeline {
spcnode: Oid,
dbnode: Oid,
version: Version<'_>,
latest: bool,
horizon: Lsn,
ctx: &RequestContext,
) -> Result<usize, PageReconstructError> {
let mut total_blocks = 0;
@@ -214,7 +214,7 @@ impl Timeline {
let rels = self.list_rels(spcnode, dbnode, version, ctx).await?;
for rel in rels {
let n_blocks = self.get_rel_size(rel, version, latest, ctx).await?;
let n_blocks = self.get_rel_size(rel, version, horizon, ctx).await?;
total_blocks += n_blocks as usize;
}
Ok(total_blocks)
@@ -225,7 +225,7 @@ impl Timeline {
&self,
tag: RelTag,
version: Version<'_>,
latest: bool,
horizon: Lsn,
ctx: &RequestContext,
) -> Result<BlockNumber, PageReconstructError> {
if tag.relnode == 0 {
@@ -239,7 +239,7 @@ impl Timeline {
}
if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
&& !self.get_rel_exists(tag, version, latest, ctx).await?
&& !self.get_rel_exists(tag, version, horizon, ctx).await?
{
// FIXME: Postgres sometimes calls smgrcreate() to create
// FSM, and smgrnblocks() on it immediately afterwards,
@@ -252,14 +252,8 @@ impl Timeline {
let mut buf = version.get(self, key, ctx).await?;
let nblocks = buf.get_u32_le();
if latest {
// Update relation size cache only if "latest" flag is set.
// This flag is set by compute when it is working with most recent version of relation.
// Typically master compute node always set latest=true.
// Please notice, that even if compute node "by mistake" specifies old LSN but set
// latest=true, then it can not cause cache corruption, because with latest=true
// pageserver choose max(request_lsn, last_written_lsn) and so cached value will be
// associated with most recent value of LSN.
if horizon == Lsn::MAX {
// Update relation size cache only if latest version is requested.
self.update_cached_rel_size(tag, version.get_lsn(), nblocks);
}
Ok(nblocks)
@@ -270,7 +264,7 @@ impl Timeline {
&self,
tag: RelTag,
version: Version<'_>,
_latest: bool,
_horizon: Lsn,
ctx: &RequestContext,
) -> Result<bool, PageReconstructError> {
if tag.relnode == 0 {
@@ -1088,7 +1082,7 @@ impl<'a> DatadirModification<'a> {
) -> anyhow::Result<()> {
let total_blocks = self
.tline
.get_db_size(spcnode, dbnode, Version::Modified(self), true, ctx)
.get_db_size(spcnode, dbnode, Version::Modified(self), Lsn::MAX, ctx)
.await?;
// Remove entry from dbdir
@@ -1187,7 +1181,7 @@ impl<'a> DatadirModification<'a> {
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
if self
.tline
.get_rel_exists(rel, Version::Modified(self), true, ctx)
.get_rel_exists(rel, Version::Modified(self), Lsn::MAX, ctx)
.await?
{
let size_key = rel_size_to_key(rel);

View File

@@ -1034,7 +1034,7 @@ impl WalIngest {
let nblocks = modification
.tline
.get_rel_size(src_rel, Version::Modified(modification), true, ctx)
.get_rel_size(src_rel, Version::Modified(modification), Lsn::MAX, ctx)
.await?;
let dst_rel = RelTag {
spcnode: tablespace_id,
@@ -1072,7 +1072,7 @@ impl WalIngest {
src_rel,
blknum,
Version::Modified(modification),
true,
Lsn::MAX,
ctx,
)
.await?;
@@ -1242,7 +1242,7 @@ impl WalIngest {
};
if modification
.tline
.get_rel_exists(rel, Version::Modified(modification), true, ctx)
.get_rel_exists(rel, Version::Modified(modification), Lsn::MAX, ctx)
.await?
{
self.put_rel_drop(modification, rel, ctx).await?;
@@ -1541,7 +1541,7 @@ impl WalIngest {
nblocks
} else if !modification
.tline
.get_rel_exists(rel, Version::Modified(modification), true, ctx)
.get_rel_exists(rel, Version::Modified(modification), Lsn::MAX, ctx)
.await?
{
// create it with 0 size initially, the logic below will extend it
@@ -1553,7 +1553,7 @@ impl WalIngest {
} else {
modification
.tline
.get_rel_size(rel, Version::Modified(modification), true, ctx)
.get_rel_size(rel, Version::Modified(modification), Lsn::MAX, ctx)
.await?
};
@@ -1650,14 +1650,14 @@ async fn get_relsize(
) -> anyhow::Result<BlockNumber> {
let nblocks = if !modification
.tline
.get_rel_exists(rel, Version::Modified(modification), true, ctx)
.get_rel_exists(rel, Version::Modified(modification), Lsn::MAX, ctx)
.await?
{
0
} else {
modification
.tline
.get_rel_size(rel, Version::Modified(modification), true, ctx)
.get_rel_size(rel, Version::Modified(modification), Lsn::MAX, ctx)
.await?
};
Ok(nblocks)
@@ -1732,29 +1732,29 @@ mod tests {
// The relation was created at LSN 2, not visible at LSN 1 yet.
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx)
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), Lsn::INVALID, &ctx)
.await?,
false
);
assert!(tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), Lsn::INVALID, &ctx)
.await
.is_err());
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), Lsn::INVALID, &ctx)
.await?,
true
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), Lsn::INVALID, &ctx)
.await?,
1
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), Lsn::INVALID, &ctx)
.await?,
3
);
@@ -1762,46 +1762,46 @@ mod tests {
// Check page contents at each LSN
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x20)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x20)), Lsn::INVALID, &ctx)
.await?,
test_img("foo blk 0 at 2")
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x30)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x30)), Lsn::INVALID, &ctx)
.await?,
test_img("foo blk 0 at 3")
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x40)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x40)), Lsn::INVALID, &ctx)
.await?,
test_img("foo blk 0 at 3")
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x40)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x40)), Lsn::INVALID, &ctx)
.await?,
test_img("foo blk 1 at 4")
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x50)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x50)), Lsn::INVALID, &ctx)
.await?,
test_img("foo blk 0 at 3")
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x50)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x50)), Lsn::INVALID, &ctx)
.await?,
test_img("foo blk 1 at 4")
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), Lsn::INVALID, &ctx)
.await?,
test_img("foo blk 2 at 5")
);
@@ -1817,19 +1817,19 @@ mod tests {
// Check reported size and contents after truncation
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), Lsn::INVALID, &ctx)
.await?,
2
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x60)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x60)), Lsn::INVALID, &ctx)
.await?,
test_img("foo blk 0 at 3")
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x60)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x60)), Lsn::INVALID, &ctx)
.await?,
test_img("foo blk 1 at 4")
);
@@ -1837,13 +1837,13 @@ mod tests {
// should still see the truncated block with older LSN
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), Lsn::INVALID, &ctx)
.await?,
3
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 2, Version::Lsn(Lsn(0x50)), Lsn::INVALID, &ctx)
.await?,
test_img("foo blk 2 at 5")
);
@@ -1856,7 +1856,7 @@ mod tests {
m.commit(&ctx).await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x68)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x68)), Lsn::INVALID, &ctx)
.await?,
0
);
@@ -1869,19 +1869,19 @@ mod tests {
m.commit(&ctx).await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x70)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x70)), Lsn::INVALID, &ctx)
.await?,
2
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x70)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 0, Version::Lsn(Lsn(0x70)), Lsn::INVALID, &ctx)
.await?,
ZERO_PAGE
);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x70)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 1, Version::Lsn(Lsn(0x70)), Lsn::INVALID, &ctx)
.await?,
test_img("foo blk 1")
);
@@ -1894,21 +1894,27 @@ mod tests {
m.commit(&ctx).await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), Lsn::INVALID, &ctx)
.await?,
1501
);
for blk in 2..1500 {
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, blk, Version::Lsn(Lsn(0x80)), false, &ctx)
.get_rel_page_at_lsn(
TESTREL_A,
blk,
Version::Lsn(Lsn(0x80)),
Lsn::INVALID,
&ctx
)
.await?,
ZERO_PAGE
);
}
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, 1500, Version::Lsn(Lsn(0x80)), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, 1500, Version::Lsn(Lsn(0x80)), Lsn::INVALID, &ctx)
.await?,
test_img("foo blk 1500")
);
@@ -1935,13 +1941,13 @@ mod tests {
// Check that rel exists and size is correct
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), Lsn::INVALID, &ctx)
.await?,
true
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), Lsn::INVALID, &ctx)
.await?,
1
);
@@ -1954,7 +1960,7 @@ mod tests {
// Check that rel is not visible anymore
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x30)), false, &ctx)
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x30)), Lsn::INVALID, &ctx)
.await?,
false
);
@@ -1972,13 +1978,13 @@ mod tests {
// Check that rel exists and size is correct
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x40)), false, &ctx)
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x40)), Lsn::INVALID, &ctx)
.await?,
true
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x40)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x40)), Lsn::INVALID, &ctx)
.await?,
1
);
@@ -2011,24 +2017,24 @@ mod tests {
// The relation was created at LSN 20, not visible at LSN 1 yet.
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx)
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x10)), Lsn::INVALID, &ctx)
.await?,
false
);
assert!(tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x10)), Lsn::INVALID, &ctx)
.await
.is_err());
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x20)), Lsn::INVALID, &ctx)
.await?,
true
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x20)), Lsn::INVALID, &ctx)
.await?,
relsize
);
@@ -2039,7 +2045,7 @@ mod tests {
let data = format!("foo blk {} at {}", blkno, lsn);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(lsn), false, &ctx)
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(lsn), Lsn::INVALID, &ctx)
.await?,
test_img(&data)
);
@@ -2056,7 +2062,7 @@ mod tests {
// Check reported size and contents after truncation
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x60)), Lsn::INVALID, &ctx)
.await?,
1
);
@@ -2066,7 +2072,13 @@ mod tests {
let data = format!("foo blk {} at {}", blkno, lsn);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x60)), false, &ctx)
.get_rel_page_at_lsn(
TESTREL_A,
blkno,
Version::Lsn(Lsn(0x60)),
Lsn::INVALID,
&ctx
)
.await?,
test_img(&data)
);
@@ -2075,7 +2087,7 @@ mod tests {
// should still see all blocks with older LSN
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x50)), Lsn::INVALID, &ctx)
.await?,
relsize
);
@@ -2084,7 +2096,13 @@ mod tests {
let data = format!("foo blk {} at {}", blkno, lsn);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x50)), false, &ctx)
.get_rel_page_at_lsn(
TESTREL_A,
blkno,
Version::Lsn(Lsn(0x50)),
Lsn::INVALID,
&ctx
)
.await?,
test_img(&data)
);
@@ -2104,13 +2122,13 @@ mod tests {
assert_eq!(
tline
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x80)), false, &ctx)
.get_rel_exists(TESTREL_A, Version::Lsn(Lsn(0x80)), Lsn::INVALID, &ctx)
.await?,
true
);
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(0x80)), Lsn::INVALID, &ctx)
.await?,
relsize
);
@@ -2120,7 +2138,13 @@ mod tests {
let data = format!("foo blk {} at {}", blkno, lsn);
assert_eq!(
tline
.get_rel_page_at_lsn(TESTREL_A, blkno, Version::Lsn(Lsn(0x80)), false, &ctx)
.get_rel_page_at_lsn(
TESTREL_A,
blkno,
Version::Lsn(Lsn(0x80)),
Lsn::INVALID,
&ctx
)
.await?,
test_img(&data)
);
@@ -2154,7 +2178,7 @@ mod tests {
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), Lsn::INVALID, &ctx)
.await?,
RELSEG_SIZE + 1
);
@@ -2168,7 +2192,7 @@ mod tests {
m.commit(&ctx).await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), Lsn::INVALID, &ctx)
.await?,
RELSEG_SIZE
);
@@ -2183,7 +2207,7 @@ mod tests {
m.commit(&ctx).await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), Lsn::INVALID, &ctx)
.await?,
RELSEG_SIZE - 1
);
@@ -2201,7 +2225,7 @@ mod tests {
m.commit(&ctx).await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), false, &ctx)
.get_rel_size(TESTREL_A, Version::Lsn(Lsn(lsn)), Lsn::INVALID, &ctx)
.await?,
size as BlockNumber
);

View File

@@ -49,6 +49,8 @@ char *neon_auth_token;
int readahead_buffer_size = 128;
int flush_every_n_requests = 8;
int neon_protocol_version;
static int n_reconnect_attempts = 0;
static int max_reconnect_attempts = 60;
static int stripe_size;
@@ -844,6 +846,14 @@ pg_init_libpagestore(void)
PGC_USERSET,
0, /* no flags required */
NULL, (GucIntAssignHook) &readahead_buffer_resize, NULL);
DefineCustomIntVariable("neon.protocol_version",
"Version of compute<->page server protocol",
NULL,
&neon_protocol_version,
NEON_PROTOCOL_VERSION, 1, INT_MAX,
PGC_USERSET,
0, /* no flags required */
NULL, NULL, NULL);
relsize_hash_init();

View File

@@ -28,10 +28,17 @@
#define MAX_SHARDS 128
#define MAX_PAGESERVER_CONNSTRING_SIZE 256
/*
* Right now protocal version is not set to the server.
* So it is ciritical that format of existed commands is not changed.
* New protocl versions can just add new commands.
*/
#define NEON_PROTOCOL_VERSION 2
typedef enum
{
/* pagestore_client -> pagestore */
T_NeonExistsRequest = 0,
T_NeonExistsRequest = 10, /* new protocol message tags start from 10 */
T_NeonNblocksRequest,
T_NeonGetPageRequest,
T_NeonDbSizeRequest,
@@ -72,14 +79,20 @@ typedef enum {
/*
* supertype of all the Neon*Request structs below
*
* If 'latest' is true, we are requesting the latest page version, and 'lsn'
* In old version of Neon we have 'latest' flag indicating that we are requesting the latest page version, and 'lsn'
* is just a hint to the server that we know there are no versions of the page
* (or relation size, for exists/nblocks requests) later than the 'lsn'.
*
* But it doesn't work for hot-standby replica because it may be not at the latest LSN position.
* So we need to be able to specify upper boundary for LSN which page server can send to us.
* This is why 'latest' flag is replaced with 'horizon'. MAX_LSN=~0 value of 'horizon' means that we are requesting latest version.
* If we need version on exact LSN (for static RO replicas), 'horizon' should be set to 0: in this case range [lsn,lsn] is used by page server.
* Otherwise for hot-standby replica we specify in 'horizon' current replay position.
*/
typedef struct
{
NeonMessageTag tag;
bool latest; /* if true, request latest page version */
XLogRecPtr horizon; /* upper boundary for page LSN */
XLogRecPtr lsn; /* request page version @ this LSN */
} NeonRequest;
@@ -193,6 +206,7 @@ extern int readahead_buffer_size;
extern char *neon_timeline;
extern char *neon_tenant;
extern int32 max_cluster_size;
extern int neon_protocol_version;
extern shardno_t get_shard_number(BufferTag* tag);

View File

@@ -110,6 +110,20 @@ static UnloggedBuildPhase unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
static bool neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id);
static bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id) = NULL;
#define MAX_LSN ((XLogRecPtr)~0)
/*
* There are three kinds of get_page :
* 1. Master compute: get the latest page not older than specified LSN (horizon=Lsn::MAX)
* 2. RO replica: get the latest page not newer than current WAL position replica already applied (horizon=GetXLogReplayRecPtr(NULL))
* 3. Snapshot: get latest page not new than specified LSN (horizon=request_lsn)
*/
static XLogRecPtr
neon_get_horizon(bool latest)
{
return latest ? MAX_LSN : RecoveryInProgress() ? GetXLogReplayRecPtr(NULL) : InvalidXLogRecPtr; /* horizon=InvalidXlogRecPtr is replaced with request_lsn at PS */
}
/*
* Prefetch implementation:
*
@@ -687,9 +701,10 @@ static void
prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force_lsn)
{
bool found;
bool latest;
NeonGetPageRequest request = {
.req.tag = T_NeonGetPageRequest,
.req.latest = false,
.req.horizon = 0,
.req.lsn = 0,
.rinfo = BufTagGetNRelFileInfo(slot->buftag),
.forknum = slot->buftag.forkNum,
@@ -699,13 +714,13 @@ prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force
if (force_lsn && force_latest)
{
request.req.lsn = *force_lsn;
request.req.latest = *force_latest;
latest = *force_latest;
slot->actual_request_lsn = slot->effective_request_lsn = *force_lsn;
}
else
{
XLogRecPtr lsn = neon_get_request_lsn(
&request.req.latest,
&latest,
BufTagGetNRelFileInfo(slot->buftag),
slot->buftag.forkNum,
slot->buftag.blockNum
@@ -733,6 +748,7 @@ prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force
prefetch_lsn = Max(prefetch_lsn, lsn);
slot->effective_request_lsn = prefetch_lsn;
}
request.req.horizon = neon_get_horizon(latest);
Assert(slot->response == NULL);
Assert(slot->my_ring_index == MyPState->ring_unused);
@@ -997,7 +1013,19 @@ nm_pack_request(NeonRequest *msg)
StringInfoData s;
initStringInfo(&s);
pq_sendbyte(&s, msg->tag);
if (neon_protocol_version >= 2)
{
pq_sendbyte(&s, msg->tag);
pq_sendint64(&s, msg->horizon);
}
else
{
/* Old protocol with latest flag */
pq_sendbyte(&s, msg->tag - T_NeonExistsRequest); /* old protocol command tags start from zero */
pq_sendbyte(&s, msg->horizon == MAX_LSN);
}
pq_sendint64(&s, msg->lsn);
switch (messageTag(msg))
{
@@ -1006,8 +1034,6 @@ nm_pack_request(NeonRequest *msg)
{
NeonExistsRequest *msg_req = (NeonExistsRequest *) msg;
pq_sendbyte(&s, msg_req->req.latest);
pq_sendint64(&s, msg_req->req.lsn);
pq_sendint32(&s, NInfoGetSpcOid(msg_req->rinfo));
pq_sendint32(&s, NInfoGetDbOid(msg_req->rinfo));
pq_sendint32(&s, NInfoGetRelNumber(msg_req->rinfo));
@@ -1019,8 +1045,6 @@ nm_pack_request(NeonRequest *msg)
{
NeonNblocksRequest *msg_req = (NeonNblocksRequest *) msg;
pq_sendbyte(&s, msg_req->req.latest);
pq_sendint64(&s, msg_req->req.lsn);
pq_sendint32(&s, NInfoGetSpcOid(msg_req->rinfo));
pq_sendint32(&s, NInfoGetDbOid(msg_req->rinfo));
pq_sendint32(&s, NInfoGetRelNumber(msg_req->rinfo));
@@ -1032,8 +1056,6 @@ nm_pack_request(NeonRequest *msg)
{
NeonDbSizeRequest *msg_req = (NeonDbSizeRequest *) msg;
pq_sendbyte(&s, msg_req->req.latest);
pq_sendint64(&s, msg_req->req.lsn);
pq_sendint32(&s, msg_req->dbNode);
break;
@@ -1042,8 +1064,6 @@ nm_pack_request(NeonRequest *msg)
{
NeonGetPageRequest *msg_req = (NeonGetPageRequest *) msg;
pq_sendbyte(&s, msg_req->req.latest);
pq_sendint64(&s, msg_req->req.lsn);
pq_sendint32(&s, NInfoGetSpcOid(msg_req->rinfo));
pq_sendint32(&s, NInfoGetDbOid(msg_req->rinfo));
pq_sendint32(&s, NInfoGetRelNumber(msg_req->rinfo));
@@ -1057,8 +1077,6 @@ nm_pack_request(NeonRequest *msg)
{
NeonGetSlruSegmentRequest *msg_req = (NeonGetSlruSegmentRequest *) msg;
pq_sendbyte(&s, msg_req->req.latest);
pq_sendint64(&s, msg_req->req.lsn);
pq_sendbyte(&s, msg_req->kind);
pq_sendint32(&s, msg_req->segno);
@@ -1209,7 +1227,7 @@ nm_to_string(NeonMessage *msg)
appendStringInfo(&s, ", \"rinfo\": \"%u/%u/%u\"", RelFileInfoFmt(msg_req->rinfo));
appendStringInfo(&s, ", \"forknum\": %d", msg_req->forknum);
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
appendStringInfo(&s, ", \"horizon\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.horizon));
appendStringInfoChar(&s, '}');
break;
}
@@ -1222,7 +1240,7 @@ nm_to_string(NeonMessage *msg)
appendStringInfo(&s, ", \"rinfo\": \"%u/%u/%u\"", RelFileInfoFmt(msg_req->rinfo));
appendStringInfo(&s, ", \"forknum\": %d", msg_req->forknum);
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
appendStringInfo(&s, ", \"horizon\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.horizon));
appendStringInfoChar(&s, '}');
break;
}
@@ -1236,7 +1254,7 @@ nm_to_string(NeonMessage *msg)
appendStringInfo(&s, ", \"forknum\": %d", msg_req->forknum);
appendStringInfo(&s, ", \"blkno\": %u", msg_req->blkno);
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
appendStringInfo(&s, ", \"horizon\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.horizon));
appendStringInfoChar(&s, '}');
break;
}
@@ -1247,7 +1265,7 @@ nm_to_string(NeonMessage *msg)
appendStringInfoString(&s, "{\"type\": \"NeonDbSizeRequest\"");
appendStringInfo(&s, ", \"dbnode\": \"%u\"", msg_req->dbNode);
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
appendStringInfo(&s, ", \"horizon\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.horizon));
appendStringInfoChar(&s, '}');
break;
}
@@ -1259,7 +1277,7 @@ nm_to_string(NeonMessage *msg)
appendStringInfo(&s, ", \"kind\": %u", msg_req->kind);
appendStringInfo(&s, ", \"segno\": %u", msg_req->segno);
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
appendStringInfo(&s, ", \"horizon\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.horizon));
appendStringInfoChar(&s, '}');
break;
}
@@ -1664,7 +1682,7 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
{
NeonExistsRequest request = {
.req.tag = T_NeonExistsRequest,
.req.latest = latest,
.req.horizon = neon_get_horizon(latest),
.req.lsn = request_lsn,
.rinfo = InfoFromSMgrRel(reln),
.forknum = forkNum};
@@ -2474,7 +2492,7 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
{
NeonNblocksRequest request = {
.req.tag = T_NeonNblocksRequest,
.req.latest = latest,
.req.horizon = neon_get_horizon(latest),
.req.lsn = request_lsn,
.rinfo = InfoFromSMgrRel(reln),
.forknum = forknum,
@@ -2531,7 +2549,7 @@ neon_dbsize(Oid dbNode)
{
NeonDbSizeRequest request = {
.req.tag = T_NeonDbSizeRequest,
.req.latest = latest,
.req.horizon = neon_get_horizon(latest),
.req.lsn = request_lsn,
.dbNode = dbNode,
};
@@ -2827,7 +2845,7 @@ neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buf
NeonResponse *resp;
NeonGetSlruSegmentRequest request = {
.req.tag = T_NeonGetSlruSegmentRequest,
.req.latest = false,
.req.horizon = InvalidXLogRecPtr,
.req.lsn = request_lsn,
.kind = kind,
@@ -2980,7 +2998,7 @@ neon_extend_rel_size(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
NeonNblocksRequest request = {
.req = (NeonRequest) {
.lsn = end_recptr,
.latest = false,
.horizon = neon_get_horizon(false),
.tag = T_NeonNblocksRequest,
},
.rinfo = rinfo,

View File

@@ -97,7 +97,6 @@ native-tls.workspace = true
postgres-native-tls.workspace = true
postgres-protocol.workspace = true
redis.workspace = true
jsonwebtoken.workspace = true
workspace_hack.workspace = true

View File

@@ -13,8 +13,6 @@ mod password_hack;
pub use password_hack::parse_endpoint_param;
use password_hack::PasswordHackPayload;
pub mod caps;
mod flow;
pub use flow::*;
use tokio::time::error::Elapsed;
@@ -73,9 +71,6 @@ pub enum AuthErrorImpl {
#[error("Too many connections to this endpoint. Please try again later.")]
TooManyConnections,
#[error("neon_caps token is invalid")]
CapsInvalid,
#[error("Authentication timed out")]
UserTimeout(Elapsed),
}
@@ -101,10 +96,6 @@ impl AuthError {
AuthErrorImpl::TooManyConnections.into()
}
pub fn caps_invalid() -> Self {
AuthErrorImpl::CapsInvalid.into()
}
pub fn is_auth_failed(&self) -> bool {
matches!(self.0.as_ref(), AuthErrorImpl::AuthFailed(_))
}
@@ -135,7 +126,6 @@ impl UserFacingError for AuthError {
IpAddressNotAllowed(_) => self.to_string(),
TooManyConnections => self.to_string(),
UserTimeout(_) => self.to_string(),
CapsInvalid => self.to_string(),
}
}
}
@@ -155,7 +145,6 @@ impl ReportableError for AuthError {
IpAddressNotAllowed(_) => crate::error::ErrorKind::User,
TooManyConnections => crate::error::ErrorKind::RateLimit,
UserTimeout(_) => crate::error::ErrorKind::User,
CapsInvalid => crate::error::ErrorKind::User,
}
}
}

View File

@@ -28,7 +28,6 @@ use crate::{
stream, url,
};
use crate::{scram, EndpointCacheKey, EndpointId, RoleName};
use std::net::IpAddr;
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{info, warn};
@@ -252,13 +251,11 @@ async fn auth_quirks(
Ok(info) => (info, None),
};
let bypass_ipcheck = apply_caps(&config, &info, &ctx.peer_addr)?;
info!("fetching user's authentication info");
let (allowed_ips, maybe_secret) = api.get_allowed_ips_and_secret(ctx, &info).await?;
// check allowed list
if !bypass_ipcheck && !check_peer_addr_is_in_list(&ctx.peer_addr, &allowed_ips) {
if !check_peer_addr_is_in_list(&ctx.peer_addr, &allowed_ips) {
return Err(auth::AuthError::ip_address_not_allowed(ctx.peer_addr));
}
let cached_secret = match maybe_secret {
@@ -540,7 +537,6 @@ mod tests {
scram_protocol_timeout: std::time::Duration::from_secs(5),
rate_limiter_enabled: true,
rate_limiter: AuthRateLimiter::new(&RateBucketInfo::DEFAULT_AUTH_SET),
caps: None,
});
async fn read_message(r: &mut (impl AsyncRead + Unpin), b: &mut BytesMut) -> PgMessage {
@@ -699,43 +695,3 @@ mod tests {
handle.await.unwrap();
}
}
// It checks that provided JWT capabilities are valid for the connection
//
// if it returns Ok(true), futher peer IP checks has to be disabled
//
// If proxy isn't configured for JWT capabilities or neon_caps option
// isn't set, it skips any checks
pub fn apply_caps(
config: &AuthenticationConfig,
info: &ComputeUserInfo,
peer_addr: &IpAddr,
) -> auth::Result<bool> {
match (&config.caps, info.options.caps()) {
(Some(caps_config), Some(caps)) => {
let token = match caps_config.decode(&caps) {
Err(_) => {
return Err(auth::AuthError::caps_invalid());
}
Ok(token) => token,
};
if token.claims.endpoint_id != *info.endpoint {
return Err(auth::AuthError::caps_invalid());
}
match token.claims.check_ip(peer_addr) {
None => return Ok(false),
Some(true) => {
return Ok(true);
}
Some(false) => {
return Err(auth::AuthError::ip_address_not_allowed(*peer_addr));
}
}
}
_ => {
return Ok(false);
}
}
}

View File

@@ -1,96 +0,0 @@
use std::{borrow::Cow, fmt::Display, fs, net::IpAddr};
use anyhow::Result;
use camino::Utf8Path;
use jsonwebtoken::{decode, Algorithm, DecodingKey, TokenData, Validation};
use serde::{Deserialize, Serialize};
use utils::http::error::ApiError;
use super::{check_peer_addr_is_in_list, IpPattern};
const TOKEN_ALGORITHM: Algorithm = Algorithm::EdDSA;
#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum Scope {
Connection,
}
#[derive(Debug, Deserialize, Clone, PartialEq)]
pub struct Claims {
pub scope: Scope,
pub allowed_ips: Option<Vec<IpPattern>>,
pub endpoint_id: String,
}
impl Claims {
pub fn check_ip(&self, ip: &IpAddr) -> Option<bool> {
let allowed_ips = match &self.allowed_ips {
None => return None,
Some(allowed_ips) => allowed_ips,
};
if allowed_ips.is_empty() {
return Some(true);
}
return Some(check_peer_addr_is_in_list(ip, &allowed_ips));
}
}
pub struct CapsValidator {
decoding_key: DecodingKey,
validation: Validation,
}
impl CapsValidator {
pub fn new(decoding_key: DecodingKey) -> Self {
let mut validation = Validation::default();
validation.algorithms = vec![TOKEN_ALGORITHM];
Self {
decoding_key,
validation,
}
}
pub fn from_key_path(key_path: &Utf8Path) -> Result<Self> {
let metadata = key_path.metadata()?;
let decoding_key = if metadata.is_file() {
let public_key = fs::read(key_path)?;
DecodingKey::from_ed_pem(&public_key)?
} else {
anyhow::bail!("path isn't a file")
};
Ok(Self::new(decoding_key))
}
pub fn decode(&self, token: &str) -> std::result::Result<TokenData<Claims>, CapsError> {
return match decode(token, &self.decoding_key, &self.validation) {
Ok(res) => Ok(res),
Err(e) => Err(CapsError(Cow::Owned(e.to_string()))),
};
}
}
impl std::fmt::Debug for CapsValidator {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CapsValidator")
.field("validation", &self.validation)
.finish()
}
}
#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct CapsError(pub Cow<'static, str>);
impl Display for CapsError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl From<CapsError> for ApiError {
fn from(_value: CapsError) -> Self {
ApiError::Forbidden("neon_caps validation error".to_string())
}
}

View File

@@ -5,11 +5,9 @@ use aws_config::meta::region::RegionProviderChain;
use aws_config::profile::ProfileFileCredentialsProvider;
use aws_config::provider_config::ProviderConfig;
use aws_config::web_identity_token::WebIdentityTokenCredentialsProvider;
use camino::Utf8Path;
use futures::future::Either;
use proxy::auth;
use proxy::auth::backend::MaybeOwned;
use proxy::auth::caps::CapsValidator;
use proxy::cancellation::CancelMap;
use proxy::cancellation::CancellationHandler;
use proxy::config::remote_storage_from_toml;
@@ -195,9 +193,6 @@ struct ProxyCliArgs {
#[clap(flatten)]
parquet_upload: ParquetUploadArgs,
#[clap(long)]
caps_key: Option<String>,
/// interval for backup metric collection
#[clap(long, default_value = "10m", value_parser = humantime::parse_duration)]
metric_backup_collection_interval: std::time::Duration,
@@ -547,19 +542,10 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
max_total_conns: args.sql_over_http.sql_over_http_pool_max_total_conns,
},
};
let caps = if let Some(key) = &args.caps_key {
let path = Utf8Path::new(key);
Some(CapsValidator::from_key_path(path)?);
} else {
None;
};
let authentication_config = AuthenticationConfig {
scram_protocol_timeout: args.scram_protocol_timeout,
rate_limiter_enabled: args.auth_rate_limit_enabled,
rate_limiter: AuthRateLimiter::new(args.auth_rate_limit.clone()),
caps,
};
let mut endpoint_rps_limit = args.endpoint_rps_limit.clone();

View File

@@ -1,5 +1,5 @@
use crate::{
auth::{self, caps::CapsValidator},
auth,
rate_limiter::{AuthRateLimiter, RateBucketInfo},
serverless::GlobalConnPoolOptions,
};
@@ -58,7 +58,6 @@ pub struct AuthenticationConfig {
pub scram_protocol_timeout: tokio::time::Duration,
pub rate_limiter_enabled: bool,
pub rate_limiter: AuthRateLimiter,
pub caps: Option<CapsValidator>,
}
impl TlsConfig {

View File

@@ -385,10 +385,6 @@ impl NeonOptions {
!self.0.is_empty()
}
pub fn caps(&self) -> Option<&str> {
self.0.iter().find(|(k, _)| k == "caps").map(|(_, v)| &**v)
}
fn parse_from_iter<'a>(options: impl Iterator<Item = &'a str>) -> Self {
let mut options = options
.filter_map(neon_option)
@@ -402,13 +398,7 @@ impl NeonOptions {
// prefix + format!(" {k}:{v}")
// kinda jank because SmolStr is immutable
std::iter::once(prefix)
// exclude caps from cache key
.chain(
self.0
.iter()
.filter(|(k, _)| k != "caps")
.flat_map(|(k, v)| [" ", &**k, ":", &**v]),
)
.chain(self.0.iter().flat_map(|(k, v)| [" ", &**k, ":", &**v]))
.collect::<SmolStr>()
.into()
}

View File

@@ -4,10 +4,7 @@ use async_trait::async_trait;
use tracing::{field::display, info};
use crate::{
auth::{
backend::{apply_caps, ComputeCredentials},
check_peer_addr_is_in_list, AuthError,
},
auth::{backend::ComputeCredentials, check_peer_addr_is_in_list, AuthError},
compute,
config::ProxyConfig,
console::{
@@ -34,15 +31,8 @@ impl PoolingBackend {
) -> Result<ComputeCredentials, AuthError> {
let user_info = conn_info.user_info.clone();
let backend = self.config.auth_backend.as_ref().map(|_| user_info.clone());
let bypass_ipcheck = apply_caps(
&&self.config.authentication_config,
&user_info,
&ctx.peer_addr,
)?;
let (allowed_ips, maybe_secret) = backend.get_allowed_ips_and_secret(ctx).await?;
if !bypass_ipcheck && !check_peer_addr_is_in_list(&ctx.peer_addr, &allowed_ips) {
if !check_peer_addr_is_in_list(&ctx.peer_addr, &allowed_ips) {
return Err(AuthError::ip_address_not_allowed(ctx.peer_addr));
}
let cached_secret = match maybe_secret {

View File

@@ -0,0 +1,9 @@
from fixtures.neon_fixtures import NeonEnv
def test_protocol_version(neon_simple_env: NeonEnv):
env = neon_simple_env
endpoint = env.endpoints.create_start("main", config_lines=["neon.protocol_version=1"])
cur = endpoint.connect().cursor()
cur.execute("show neon.protocol_version")
assert cur.fetchone() == ("1",)