mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-28 10:30:40 +00:00
Replace latest with horizon in get_page request
This commit is contained in:
@@ -697,21 +697,21 @@ impl TryFrom<u8> for PagestreamBeMessageTag {
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub struct PagestreamExistsRequest {
|
||||
pub latest: bool,
|
||||
pub horizon: Lsn,
|
||||
pub lsn: Lsn,
|
||||
pub rel: RelTag,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub struct PagestreamNblocksRequest {
|
||||
pub latest: bool,
|
||||
pub horizon: Lsn,
|
||||
pub lsn: Lsn,
|
||||
pub rel: RelTag,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub struct PagestreamGetPageRequest {
|
||||
pub latest: bool,
|
||||
pub horizon: Lsn,
|
||||
pub lsn: Lsn,
|
||||
pub rel: RelTag,
|
||||
pub blkno: u32,
|
||||
@@ -719,7 +719,7 @@ pub struct PagestreamGetPageRequest {
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub struct PagestreamDbSizeRequest {
|
||||
pub latest: bool,
|
||||
pub horizon: Lsn,
|
||||
pub lsn: Lsn,
|
||||
pub dbnode: u32,
|
||||
}
|
||||
@@ -780,7 +780,7 @@ impl PagestreamFeMessage {
|
||||
match self {
|
||||
Self::Exists(req) => {
|
||||
bytes.put_u8(0);
|
||||
bytes.put_u8(u8::from(req.latest));
|
||||
bytes.put_u64(req.horizon.0);
|
||||
bytes.put_u64(req.lsn.0);
|
||||
bytes.put_u32(req.rel.spcnode);
|
||||
bytes.put_u32(req.rel.dbnode);
|
||||
@@ -790,7 +790,7 @@ impl PagestreamFeMessage {
|
||||
|
||||
Self::Nblocks(req) => {
|
||||
bytes.put_u8(1);
|
||||
bytes.put_u8(u8::from(req.latest));
|
||||
bytes.put_u64(req.horizon.0);
|
||||
bytes.put_u64(req.lsn.0);
|
||||
bytes.put_u32(req.rel.spcnode);
|
||||
bytes.put_u32(req.rel.dbnode);
|
||||
@@ -800,7 +800,7 @@ impl PagestreamFeMessage {
|
||||
|
||||
Self::GetPage(req) => {
|
||||
bytes.put_u8(2);
|
||||
bytes.put_u8(u8::from(req.latest));
|
||||
bytes.put_u64(req.horizon.0);
|
||||
bytes.put_u64(req.lsn.0);
|
||||
bytes.put_u32(req.rel.spcnode);
|
||||
bytes.put_u32(req.rel.dbnode);
|
||||
@@ -811,7 +811,7 @@ impl PagestreamFeMessage {
|
||||
|
||||
Self::DbSize(req) => {
|
||||
bytes.put_u8(3);
|
||||
bytes.put_u8(u8::from(req.latest));
|
||||
bytes.put_u64(req.horizon.0);
|
||||
bytes.put_u64(req.lsn.0);
|
||||
bytes.put_u32(req.dbnode);
|
||||
}
|
||||
@@ -838,7 +838,7 @@ impl PagestreamFeMessage {
|
||||
let msg_tag = body.read_u8()?;
|
||||
match msg_tag {
|
||||
0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest {
|
||||
latest: body.read_u8()? != 0,
|
||||
horizon: Lsn::from(body.read_u64::<BigEndian>()?),
|
||||
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
|
||||
rel: RelTag {
|
||||
spcnode: body.read_u32::<BigEndian>()?,
|
||||
@@ -848,7 +848,7 @@ impl PagestreamFeMessage {
|
||||
},
|
||||
})),
|
||||
1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
|
||||
latest: body.read_u8()? != 0,
|
||||
horizon: Lsn::from(body.read_u64::<BigEndian>()?),
|
||||
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
|
||||
rel: RelTag {
|
||||
spcnode: body.read_u32::<BigEndian>()?,
|
||||
@@ -858,7 +858,7 @@ impl PagestreamFeMessage {
|
||||
},
|
||||
})),
|
||||
2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
|
||||
latest: body.read_u8()? != 0,
|
||||
horizon: Lsn::from(body.read_u64::<BigEndian>()?),
|
||||
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
|
||||
rel: RelTag {
|
||||
spcnode: body.read_u32::<BigEndian>()?,
|
||||
@@ -869,7 +869,7 @@ impl PagestreamFeMessage {
|
||||
blkno: body.read_u32::<BigEndian>()?,
|
||||
})),
|
||||
3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
|
||||
latest: body.read_u8()? != 0,
|
||||
horizon: Lsn::from(body.read_u64::<BigEndian>()?),
|
||||
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
|
||||
dbnode: body.read_u32::<BigEndian>()?,
|
||||
})),
|
||||
@@ -1005,7 +1005,7 @@ mod tests {
|
||||
// Test serialization/deserialization of PagestreamFeMessage
|
||||
let messages = vec![
|
||||
PagestreamFeMessage::Exists(PagestreamExistsRequest {
|
||||
latest: true,
|
||||
horizon: Lsn.MAX,
|
||||
lsn: Lsn(4),
|
||||
rel: RelTag {
|
||||
forknum: 1,
|
||||
@@ -1015,7 +1015,7 @@ mod tests {
|
||||
},
|
||||
}),
|
||||
PagestreamFeMessage::Nblocks(PagestreamNblocksRequest {
|
||||
latest: false,
|
||||
horizon: Lsn(4),
|
||||
lsn: Lsn(4),
|
||||
rel: RelTag {
|
||||
forknum: 1,
|
||||
@@ -1025,7 +1025,7 @@ mod tests {
|
||||
},
|
||||
}),
|
||||
PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
|
||||
latest: true,
|
||||
latest: Lsn.MAX,
|
||||
lsn: Lsn(4),
|
||||
rel: RelTag {
|
||||
forknum: 1,
|
||||
@@ -1036,7 +1036,7 @@ mod tests {
|
||||
blkno: 7,
|
||||
}),
|
||||
PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
|
||||
latest: true,
|
||||
latest: Lsn::MAX,
|
||||
lsn: Lsn(4),
|
||||
dbnode: 7,
|
||||
}),
|
||||
|
||||
@@ -52,7 +52,7 @@ pub struct SkTimelineInfo {
|
||||
pub http_connstr: Option<String>,
|
||||
// Minimum of all active RO replicas flush LSN
|
||||
#[serde(default = "lsn_invalid")]
|
||||
pub standby_flush_lsn: Lsn,
|
||||
pub standby_horizon: Lsn,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
|
||||
@@ -288,7 +288,11 @@ async fn main_impl(
|
||||
num_client: rng.gen_range(0..args.num_clients.get()),
|
||||
},
|
||||
PagestreamGetPageRequest {
|
||||
latest: rng.gen_bool(args.req_latest_probability),
|
||||
horizon: if rng.gen_bool(args.req_latest_probability) {
|
||||
Lsn::MAX
|
||||
} else {
|
||||
r.timeline_lsn
|
||||
},
|
||||
lsn: r.timeline_lsn,
|
||||
rel: rel_tag,
|
||||
blkno: block_no,
|
||||
@@ -335,7 +339,11 @@ async fn main_impl(
|
||||
let (rel_tag, block_no) = key_to_rel_block(key)
|
||||
.expect("we filter non-rel-block keys out above");
|
||||
PagestreamGetPageRequest {
|
||||
latest: rng.gen_bool(args.req_latest_probability),
|
||||
horizon: if rng.gen_bool(args.req_latest_probability) {
|
||||
Lsn::MAX
|
||||
} else {
|
||||
r.timeline_lsn
|
||||
},
|
||||
lsn: r.timeline_lsn,
|
||||
rel: rel_tag,
|
||||
blkno: block_no,
|
||||
|
||||
@@ -357,9 +357,10 @@ where
|
||||
|
||||
/// Add contents of relfilenode `src`, naming it as `dst`.
|
||||
async fn add_rel(&mut self, src: RelTag, dst: RelTag) -> anyhow::Result<()> {
|
||||
let horizon = self.lsn; // we do not need latest version
|
||||
let nblocks = self
|
||||
.timeline
|
||||
.get_rel_size(src, Version::Lsn(self.lsn), false, self.ctx)
|
||||
.get_rel_size(src, Version::Lsn(self.lsn), horizon, self.ctx)
|
||||
.await?;
|
||||
|
||||
// If the relation is empty, create an empty file
|
||||
@@ -380,7 +381,7 @@ where
|
||||
for blknum in startblk..endblk {
|
||||
let img = self
|
||||
.timeline
|
||||
.get_rel_page_at_lsn(src, blknum, Version::Lsn(self.lsn), false, self.ctx)
|
||||
.get_rel_page_at_lsn(src, blknum, Version::Lsn(self.lsn), horizon, self.ctx)
|
||||
.await?;
|
||||
segment_data.extend_from_slice(&img[..]);
|
||||
}
|
||||
|
||||
@@ -842,55 +842,34 @@ impl PageServerHandler {
|
||||
/// the LSN that should be used to look up the page versions.
|
||||
async fn wait_or_get_last_lsn(
|
||||
timeline: &Timeline,
|
||||
mut lsn: Lsn,
|
||||
latest: bool,
|
||||
lsn: Lsn,
|
||||
horizon: Lsn,
|
||||
latest_gc_cutoff_lsn: &RcuReadGuard<Lsn>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Lsn, PageStreamError> {
|
||||
if latest {
|
||||
// Latest page version was requested. If LSN is given, it is a hint
|
||||
// to the page server that there have been no modifications to the
|
||||
// page after that LSN. If we haven't received WAL up to that point,
|
||||
// wait until it arrives.
|
||||
let last_record_lsn = timeline.get_last_record_lsn();
|
||||
|
||||
// Note: this covers the special case that lsn == Lsn(0). That
|
||||
// special case means "return the latest version whatever it is",
|
||||
// and it's used for bootstrapping purposes, when the page server is
|
||||
// connected directly to the compute node. That is needed because
|
||||
// when you connect to the compute node, to receive the WAL, the
|
||||
// walsender process will do a look up in the pg_authid catalog
|
||||
// table for authentication. That poses a deadlock problem: the
|
||||
// catalog table lookup will send a GetPage request, but the GetPage
|
||||
// request will block in the page server because the recent WAL
|
||||
// hasn't been received yet, and it cannot be received until the
|
||||
// walsender completes the authentication and starts streaming the
|
||||
// WAL.
|
||||
if lsn <= last_record_lsn {
|
||||
lsn = last_record_lsn;
|
||||
} else {
|
||||
timeline.wait_lsn(lsn, ctx).await?;
|
||||
// Since we waited for 'lsn' to arrive, that is now the last
|
||||
// record LSN. (Or close enough for our purposes; the
|
||||
// last-record LSN can advance immediately after we return
|
||||
// anyway)
|
||||
}
|
||||
let last_record_lsn = timeline.get_last_record_lsn();
|
||||
let effective_lsn = Lsn::max(lsn, Lsn::min(horizon, last_record_lsn));
|
||||
if effective_lsn > last_record_lsn {
|
||||
timeline.wait_lsn(effective_lsn, ctx).await?;
|
||||
// Since we waited for 'lsn' to arrive, that is now the last
|
||||
// record LSN. (Or close enough for our purposes; the
|
||||
// last-record LSN can advance immediately after we return
|
||||
// anyway)
|
||||
} else {
|
||||
if lsn == Lsn(0) {
|
||||
if effective_lsn == Lsn(0) {
|
||||
return Err(PageStreamError::BadRequest(
|
||||
"invalid LSN(0) in request".into(),
|
||||
));
|
||||
}
|
||||
timeline.wait_lsn(lsn, ctx).await?;
|
||||
}
|
||||
|
||||
if lsn < **latest_gc_cutoff_lsn {
|
||||
if effective_lsn < **latest_gc_cutoff_lsn {
|
||||
return Err(PageStreamError::BadRequest(format!(
|
||||
"tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
|
||||
lsn, **latest_gc_cutoff_lsn
|
||||
effective_lsn, **latest_gc_cutoff_lsn
|
||||
).into()));
|
||||
}
|
||||
Ok(lsn)
|
||||
Ok(effective_lsn)
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(shard_id))]
|
||||
@@ -908,11 +887,11 @@ impl PageServerHandler {
|
||||
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn =
|
||||
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
|
||||
Self::wait_or_get_last_lsn(timeline, req.lsn, req.horizon, &latest_gc_cutoff_lsn, ctx)
|
||||
.await?;
|
||||
|
||||
let exists = timeline
|
||||
.get_rel_exists(req.rel, Version::Lsn(lsn), req.latest, ctx)
|
||||
.get_rel_exists(req.rel, Version::Lsn(lsn), req.horizon, ctx)
|
||||
.await?;
|
||||
|
||||
Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse {
|
||||
@@ -936,11 +915,11 @@ impl PageServerHandler {
|
||||
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn =
|
||||
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
|
||||
Self::wait_or_get_last_lsn(timeline, req.lsn, req.horizon, &latest_gc_cutoff_lsn, ctx)
|
||||
.await?;
|
||||
|
||||
let n_blocks = timeline
|
||||
.get_rel_size(req.rel, Version::Lsn(lsn), req.latest, ctx)
|
||||
.get_rel_size(req.rel, Version::Lsn(lsn), req.horizon, ctx)
|
||||
.await?;
|
||||
|
||||
Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse {
|
||||
@@ -964,7 +943,7 @@ impl PageServerHandler {
|
||||
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn =
|
||||
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
|
||||
Self::wait_or_get_last_lsn(timeline, req.lsn, req.horizon, &latest_gc_cutoff_lsn, ctx)
|
||||
.await?;
|
||||
|
||||
let total_blocks = timeline
|
||||
@@ -972,7 +951,7 @@ impl PageServerHandler {
|
||||
DEFAULTTABLESPACE_OID,
|
||||
req.dbnode,
|
||||
Version::Lsn(lsn),
|
||||
req.latest,
|
||||
req.horizon,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -1142,11 +1121,11 @@ impl PageServerHandler {
|
||||
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn =
|
||||
Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn, ctx)
|
||||
Self::wait_or_get_last_lsn(timeline, req.lsn, req.horizon, &latest_gc_cutoff_lsn, ctx)
|
||||
.await?;
|
||||
|
||||
let page = timeline
|
||||
.get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), req.latest, ctx)
|
||||
.get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), req.horizon, ctx)
|
||||
.await?;
|
||||
|
||||
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
|
||||
|
||||
@@ -169,7 +169,7 @@ impl Timeline {
|
||||
tag: RelTag,
|
||||
blknum: BlockNumber,
|
||||
version: Version<'_>,
|
||||
latest: bool,
|
||||
horizon: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Bytes, PageReconstructError> {
|
||||
if tag.relnode == 0 {
|
||||
@@ -178,7 +178,7 @@ impl Timeline {
|
||||
));
|
||||
}
|
||||
|
||||
let nblocks = self.get_rel_size(tag, version, latest, ctx).await?;
|
||||
let nblocks = self.get_rel_size(tag, version, horizon, ctx).await?;
|
||||
if blknum >= nblocks {
|
||||
debug!(
|
||||
"read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
|
||||
@@ -200,7 +200,7 @@ impl Timeline {
|
||||
spcnode: Oid,
|
||||
dbnode: Oid,
|
||||
version: Version<'_>,
|
||||
latest: bool,
|
||||
horizon: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<usize, PageReconstructError> {
|
||||
let mut total_blocks = 0;
|
||||
@@ -208,7 +208,7 @@ impl Timeline {
|
||||
let rels = self.list_rels(spcnode, dbnode, version, ctx).await?;
|
||||
|
||||
for rel in rels {
|
||||
let n_blocks = self.get_rel_size(rel, version, latest, ctx).await?;
|
||||
let n_blocks = self.get_rel_size(rel, version, horizon, ctx).await?;
|
||||
total_blocks += n_blocks as usize;
|
||||
}
|
||||
Ok(total_blocks)
|
||||
@@ -219,7 +219,7 @@ impl Timeline {
|
||||
&self,
|
||||
tag: RelTag,
|
||||
version: Version<'_>,
|
||||
latest: bool,
|
||||
horizon: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BlockNumber, PageReconstructError> {
|
||||
if tag.relnode == 0 {
|
||||
@@ -233,7 +233,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
|
||||
&& !self.get_rel_exists(tag, version, latest, ctx).await?
|
||||
&& !self.get_rel_exists(tag, version, horizon, ctx).await?
|
||||
{
|
||||
// FIXME: Postgres sometimes calls smgrcreate() to create
|
||||
// FSM, and smgrnblocks() on it immediately afterwards,
|
||||
@@ -246,14 +246,8 @@ impl Timeline {
|
||||
let mut buf = version.get(self, key, ctx).await?;
|
||||
let nblocks = buf.get_u32_le();
|
||||
|
||||
if latest {
|
||||
// Update relation size cache only if "latest" flag is set.
|
||||
// This flag is set by compute when it is working with most recent version of relation.
|
||||
// Typically master compute node always set latest=true.
|
||||
// Please notice, that even if compute node "by mistake" specifies old LSN but set
|
||||
// latest=true, then it can not cause cache corruption, because with latest=true
|
||||
// pageserver choose max(request_lsn, last_written_lsn) and so cached value will be
|
||||
// associated with most recent value of LSN.
|
||||
if horizon == Lsn::MAX {
|
||||
// Update relation size cache only if latest version is requested.
|
||||
self.update_cached_rel_size(tag, version.get_lsn(), nblocks);
|
||||
}
|
||||
Ok(nblocks)
|
||||
@@ -264,7 +258,7 @@ impl Timeline {
|
||||
&self,
|
||||
tag: RelTag,
|
||||
version: Version<'_>,
|
||||
_latest: bool,
|
||||
_horizon: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<bool, PageReconstructError> {
|
||||
if tag.relnode == 0 {
|
||||
@@ -1066,7 +1060,7 @@ impl<'a> DatadirModification<'a> {
|
||||
) -> anyhow::Result<()> {
|
||||
let total_blocks = self
|
||||
.tline
|
||||
.get_db_size(spcnode, dbnode, Version::Modified(self), true, ctx)
|
||||
.get_db_size(spcnode, dbnode, Version::Modified(self), Lsn::MAX, ctx)
|
||||
.await?;
|
||||
|
||||
// Remove entry from dbdir
|
||||
@@ -1157,7 +1151,7 @@ impl<'a> DatadirModification<'a> {
|
||||
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
|
||||
if self
|
||||
.tline
|
||||
.get_rel_exists(rel, Version::Modified(self), true, ctx)
|
||||
.get_rel_exists(rel, Version::Modified(self), Lsn::MAX, ctx)
|
||||
.await?
|
||||
{
|
||||
let size_key = rel_size_to_key(rel);
|
||||
|
||||
@@ -215,7 +215,7 @@ pub struct Timeline {
|
||||
// Atomic would be more appropriate here.
|
||||
last_freeze_ts: RwLock<Instant>,
|
||||
|
||||
pub(crate) standby_flush_lsn: AtomicLsn,
|
||||
pub(crate) standby_horizon: AtomicLsn,
|
||||
|
||||
// WAL redo manager. `None` only for broken tenants.
|
||||
walredo_mgr: Option<Arc<super::WalRedoManager>>,
|
||||
@@ -1542,7 +1542,7 @@ impl Timeline {
|
||||
|
||||
compaction_lock: tokio::sync::Mutex::default(),
|
||||
gc_lock: tokio::sync::Mutex::default(),
|
||||
standby_flush_lsn: AtomicLsn::new(0),
|
||||
standby_horizon: AtomicLsn::new(0),
|
||||
};
|
||||
result.repartition_threshold =
|
||||
result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE;
|
||||
@@ -4202,9 +4202,9 @@ impl Timeline {
|
||||
};
|
||||
|
||||
let new_gc_cutoff = Lsn::min(horizon_cutoff, pitr_cutoff);
|
||||
let standby_flush_lsn = self.standby_flush_lsn.load();
|
||||
let new_gc_cutoff = if standby_flush_lsn != Lsn::INVALID {
|
||||
Lsn::min(standby_flush_lsn, new_gc_cutoff)
|
||||
let standby_horizon = self.standby_horizon.load();
|
||||
let new_gc_cutoff = if standby_horizon != Lsn::INVALID {
|
||||
Lsn::min(standby_horizon, new_gc_cutoff)
|
||||
} else {
|
||||
new_gc_cutoff
|
||||
};
|
||||
|
||||
@@ -554,8 +554,8 @@ impl ConnectionManagerState {
|
||||
WALRECEIVER_BROKER_UPDATES.inc();
|
||||
|
||||
self.timeline
|
||||
.standby_flush_lsn
|
||||
.store(Lsn(timeline_update.standby_flush_lsn));
|
||||
.standby_horizon
|
||||
.store(Lsn(timeline_update.standby_horizon));
|
||||
|
||||
let new_safekeeper_id = NodeId(timeline_update.safekeeper_id);
|
||||
let old_entry = self.wal_stream_candidates.insert(
|
||||
@@ -923,7 +923,7 @@ mod tests {
|
||||
remote_consistent_lsn: 0,
|
||||
peer_horizon_lsn: 0,
|
||||
local_start_lsn: 0,
|
||||
standby_flush_lsn: 0,
|
||||
standby_horizon: 0,
|
||||
safekeeper_connstr: safekeeper_connstr.to_owned(),
|
||||
http_connstr: safekeeper_connstr.to_owned(),
|
||||
availability_zone: None,
|
||||
|
||||
@@ -1019,7 +1019,7 @@ impl WalIngest {
|
||||
|
||||
let nblocks = modification
|
||||
.tline
|
||||
.get_rel_size(src_rel, Version::Modified(modification), true, ctx)
|
||||
.get_rel_size(src_rel, Version::Modified(modification), Lsn::MAX, ctx)
|
||||
.await?;
|
||||
let dst_rel = RelTag {
|
||||
spcnode: tablespace_id,
|
||||
@@ -1057,7 +1057,7 @@ impl WalIngest {
|
||||
src_rel,
|
||||
blknum,
|
||||
Version::Modified(modification),
|
||||
true,
|
||||
Lsn::MAX,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -1227,7 +1227,7 @@ impl WalIngest {
|
||||
};
|
||||
if modification
|
||||
.tline
|
||||
.get_rel_exists(rel, Version::Modified(modification), true, ctx)
|
||||
.get_rel_exists(rel, Version::Modified(modification), Lsn::MAX, ctx)
|
||||
.await?
|
||||
{
|
||||
self.put_rel_drop(modification, rel, ctx).await?;
|
||||
@@ -1526,7 +1526,7 @@ impl WalIngest {
|
||||
nblocks
|
||||
} else if !modification
|
||||
.tline
|
||||
.get_rel_exists(rel, Version::Modified(modification), true, ctx)
|
||||
.get_rel_exists(rel, Version::Modified(modification), Lsn::MAX, ctx)
|
||||
.await?
|
||||
{
|
||||
// create it with 0 size initially, the logic below will extend it
|
||||
@@ -1538,7 +1538,7 @@ impl WalIngest {
|
||||
} else {
|
||||
modification
|
||||
.tline
|
||||
.get_rel_size(rel, Version::Modified(modification), true, ctx)
|
||||
.get_rel_size(rel, Version::Modified(modification), Lsn::MAX, ctx)
|
||||
.await?
|
||||
};
|
||||
|
||||
@@ -1635,14 +1635,14 @@ async fn get_relsize(
|
||||
) -> anyhow::Result<BlockNumber> {
|
||||
let nblocks = if !modification
|
||||
.tline
|
||||
.get_rel_exists(rel, Version::Modified(modification), true, ctx)
|
||||
.get_rel_exists(rel, Version::Modified(modification), Lsn::MAX, ctx)
|
||||
.await?
|
||||
{
|
||||
0
|
||||
} else {
|
||||
modification
|
||||
.tline
|
||||
.get_rel_size(rel, Version::Modified(modification), true, ctx)
|
||||
.get_rel_size(rel, Version::Modified(modification), Lsn::MAX, ctx)
|
||||
.await?
|
||||
};
|
||||
Ok(nblocks)
|
||||
|
||||
@@ -79,7 +79,7 @@ typedef enum {
|
||||
typedef struct
|
||||
{
|
||||
NeonMessageTag tag;
|
||||
bool latest; /* if true, request latest page version */
|
||||
XLogRecPtr horizon; /* uppe boundary for page LSN */
|
||||
XLogRecPtr lsn; /* request page version @ this LSN */
|
||||
} NeonRequest;
|
||||
|
||||
|
||||
@@ -110,6 +110,14 @@ static UnloggedBuildPhase unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
|
||||
static bool neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id);
|
||||
static bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id) = NULL;
|
||||
|
||||
#define MAX_LSN ((XLogRecPtr)~0)
|
||||
|
||||
static XLogRecPtr
|
||||
neon_get_horizon(bool latest)
|
||||
{
|
||||
return latest ? MAX_LSN : GetXLogReplayRecPtr(NULL);
|
||||
}
|
||||
|
||||
/*
|
||||
* Prefetch implementation:
|
||||
*
|
||||
@@ -680,9 +688,10 @@ static void
|
||||
prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force_lsn)
|
||||
{
|
||||
bool found;
|
||||
bool latest;
|
||||
NeonGetPageRequest request = {
|
||||
.req.tag = T_NeonGetPageRequest,
|
||||
.req.latest = false,
|
||||
.req.horizon = 0,
|
||||
.req.lsn = 0,
|
||||
.rinfo = BufTagGetNRelFileInfo(slot->buftag),
|
||||
.forknum = slot->buftag.forkNum,
|
||||
@@ -692,13 +701,13 @@ prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force
|
||||
if (force_lsn && force_latest)
|
||||
{
|
||||
request.req.lsn = *force_lsn;
|
||||
request.req.latest = *force_latest;
|
||||
latest = *force_latest;
|
||||
slot->actual_request_lsn = slot->effective_request_lsn = *force_lsn;
|
||||
}
|
||||
else
|
||||
{
|
||||
XLogRecPtr lsn = neon_get_request_lsn(
|
||||
&request.req.latest,
|
||||
&latest,
|
||||
BufTagGetNRelFileInfo(slot->buftag),
|
||||
slot->buftag.forkNum,
|
||||
slot->buftag.blockNum
|
||||
@@ -726,6 +735,7 @@ prefetch_do_request(PrefetchRequest *slot, bool *force_latest, XLogRecPtr *force
|
||||
prefetch_lsn = Max(prefetch_lsn, lsn);
|
||||
slot->effective_request_lsn = prefetch_lsn;
|
||||
}
|
||||
request.req.horizon = neon_get_horizon(latest);
|
||||
|
||||
Assert(slot->response == NULL);
|
||||
Assert(slot->my_ring_index == MyPState->ring_unused);
|
||||
@@ -996,7 +1006,7 @@ nm_pack_request(NeonRequest *msg)
|
||||
{
|
||||
NeonExistsRequest *msg_req = (NeonExistsRequest *) msg;
|
||||
|
||||
pq_sendbyte(&s, msg_req->req.latest);
|
||||
pq_sendint64(&s, msg_req->req.horizon);
|
||||
pq_sendint64(&s, msg_req->req.lsn);
|
||||
pq_sendint32(&s, NInfoGetSpcOid(msg_req->rinfo));
|
||||
pq_sendint32(&s, NInfoGetDbOid(msg_req->rinfo));
|
||||
@@ -1009,7 +1019,7 @@ nm_pack_request(NeonRequest *msg)
|
||||
{
|
||||
NeonNblocksRequest *msg_req = (NeonNblocksRequest *) msg;
|
||||
|
||||
pq_sendbyte(&s, msg_req->req.latest);
|
||||
pq_sendint64(&s, msg_req->req.horizon);
|
||||
pq_sendint64(&s, msg_req->req.lsn);
|
||||
pq_sendint32(&s, NInfoGetSpcOid(msg_req->rinfo));
|
||||
pq_sendint32(&s, NInfoGetDbOid(msg_req->rinfo));
|
||||
@@ -1022,7 +1032,7 @@ nm_pack_request(NeonRequest *msg)
|
||||
{
|
||||
NeonDbSizeRequest *msg_req = (NeonDbSizeRequest *) msg;
|
||||
|
||||
pq_sendbyte(&s, msg_req->req.latest);
|
||||
pq_sendint64(&s, msg_req->req.horizon);
|
||||
pq_sendint64(&s, msg_req->req.lsn);
|
||||
pq_sendint32(&s, msg_req->dbNode);
|
||||
|
||||
@@ -1032,7 +1042,7 @@ nm_pack_request(NeonRequest *msg)
|
||||
{
|
||||
NeonGetPageRequest *msg_req = (NeonGetPageRequest *) msg;
|
||||
|
||||
pq_sendbyte(&s, msg_req->req.latest);
|
||||
pq_sendint64(&s, msg_req->req.horizon);
|
||||
pq_sendint64(&s, msg_req->req.lsn);
|
||||
pq_sendint32(&s, NInfoGetSpcOid(msg_req->rinfo));
|
||||
pq_sendint32(&s, NInfoGetDbOid(msg_req->rinfo));
|
||||
@@ -1199,7 +1209,7 @@ nm_to_string(NeonMessage *msg)
|
||||
appendStringInfo(&s, ", \"rinfo\": \"%u/%u/%u\"", RelFileInfoFmt(msg_req->rinfo));
|
||||
appendStringInfo(&s, ", \"forknum\": %d", msg_req->forknum);
|
||||
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
|
||||
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
|
||||
appendStringInfo(&s, ", \"horizon\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.horizon));
|
||||
appendStringInfoChar(&s, '}');
|
||||
break;
|
||||
}
|
||||
@@ -1212,7 +1222,7 @@ nm_to_string(NeonMessage *msg)
|
||||
appendStringInfo(&s, ", \"rinfo\": \"%u/%u/%u\"", RelFileInfoFmt(msg_req->rinfo));
|
||||
appendStringInfo(&s, ", \"forknum\": %d", msg_req->forknum);
|
||||
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
|
||||
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
|
||||
appendStringInfo(&s, ", \"horizon\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.horizon));
|
||||
appendStringInfoChar(&s, '}');
|
||||
break;
|
||||
}
|
||||
@@ -1226,7 +1236,7 @@ nm_to_string(NeonMessage *msg)
|
||||
appendStringInfo(&s, ", \"forknum\": %d", msg_req->forknum);
|
||||
appendStringInfo(&s, ", \"blkno\": %u", msg_req->blkno);
|
||||
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
|
||||
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
|
||||
appendStringInfo(&s, ", \"horizon\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.horizon));
|
||||
appendStringInfoChar(&s, '}');
|
||||
break;
|
||||
}
|
||||
@@ -1237,7 +1247,7 @@ nm_to_string(NeonMessage *msg)
|
||||
appendStringInfoString(&s, "{\"type\": \"NeonDbSizeRequest\"");
|
||||
appendStringInfo(&s, ", \"dbnode\": \"%u\"", msg_req->dbNode);
|
||||
appendStringInfo(&s, ", \"lsn\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.lsn));
|
||||
appendStringInfo(&s, ", \"latest\": %d", msg_req->req.latest);
|
||||
appendStringInfo(&s, ", \"horizon\": \"%X/%X\"", LSN_FORMAT_ARGS(msg_req->req.horizon));
|
||||
appendStringInfoChar(&s, '}');
|
||||
break;
|
||||
}
|
||||
@@ -1654,7 +1664,7 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
|
||||
{
|
||||
NeonExistsRequest request = {
|
||||
.req.tag = T_NeonExistsRequest,
|
||||
.req.latest = latest,
|
||||
.req.horizon = neon_get_horizon(latest),
|
||||
.req.lsn = request_lsn,
|
||||
.rinfo = InfoFromSMgrRel(reln),
|
||||
.forknum = forkNum};
|
||||
@@ -2463,7 +2473,7 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
|
||||
{
|
||||
NeonNblocksRequest request = {
|
||||
.req.tag = T_NeonNblocksRequest,
|
||||
.req.latest = latest,
|
||||
.req.horizon = neon_get_horizon(latest),
|
||||
.req.lsn = request_lsn,
|
||||
.rinfo = InfoFromSMgrRel(reln),
|
||||
.forknum = forknum,
|
||||
@@ -2520,7 +2530,7 @@ neon_dbsize(Oid dbNode)
|
||||
{
|
||||
NeonDbSizeRequest request = {
|
||||
.req.tag = T_NeonDbSizeRequest,
|
||||
.req.latest = latest,
|
||||
.req.horizon = neon_get_horizon(latest),
|
||||
.req.lsn = request_lsn,
|
||||
.dbNode = dbNode,
|
||||
};
|
||||
@@ -2969,7 +2979,7 @@ neon_extend_rel_size(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
||||
NeonNblocksRequest request = {
|
||||
.req = (NeonRequest) {
|
||||
.lsn = end_recptr,
|
||||
.latest = false,
|
||||
.horizon = neon_get_horizon(false),
|
||||
.tag = T_NeonNblocksRequest,
|
||||
},
|
||||
.rinfo = rinfo,
|
||||
|
||||
@@ -350,7 +350,7 @@ async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<B
|
||||
backup_lsn: sk_info.backup_lsn.0,
|
||||
local_start_lsn: sk_info.local_start_lsn.0,
|
||||
availability_zone: None,
|
||||
standby_flush_lsn: sk_info.standby_flush_lsn.0,
|
||||
standby_horizon: sk_info.standby_horizon.0,
|
||||
};
|
||||
|
||||
let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
|
||||
|
||||
@@ -124,7 +124,7 @@ pub struct TimelineMemState {
|
||||
pub remote_consistent_lsn: Lsn,
|
||||
#[serde(with = "hex")]
|
||||
pub proposer_uuid: PgUuid,
|
||||
pub standby_flush_lsn: Lsn,
|
||||
pub standby_horizon: Lsn,
|
||||
}
|
||||
|
||||
/// Safekeeper persistent state plus in memory layer, to avoid frequent fsyncs
|
||||
@@ -149,7 +149,7 @@ where
|
||||
peer_horizon_lsn: state.peer_horizon_lsn,
|
||||
remote_consistent_lsn: state.remote_consistent_lsn,
|
||||
proposer_uuid: state.proposer_uuid,
|
||||
standby_flush_lsn: Lsn::INVALID,
|
||||
standby_horizon: Lsn::INVALID,
|
||||
},
|
||||
pers: state,
|
||||
}
|
||||
|
||||
@@ -270,7 +270,7 @@ impl SharedState {
|
||||
backup_lsn: self.sk.state.inmem.backup_lsn.0,
|
||||
local_start_lsn: self.sk.state.local_start_lsn.0,
|
||||
availability_zone: conf.availability_zone.clone(),
|
||||
standby_flush_lsn: self.sk.state.inmem.standby_flush_lsn.0,
|
||||
standby_horizon: self.sk.state.inmem.standby_horizon.0,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -632,7 +632,7 @@ impl Timeline {
|
||||
let (ps_feedback, standby_feedback) = self.walsenders.get_feedbacks();
|
||||
resp.hs_feedback = standby_feedback.hs_feedback;
|
||||
resp.pageserver_feedback = ps_feedback;
|
||||
shared_state.sk.state.inmem.standby_flush_lsn = standby_feedback.reply.flush_lsn;
|
||||
shared_state.sk.state.inmem.standby_horizon = standby_feedback.reply.apply_lsn;
|
||||
}
|
||||
|
||||
commit_lsn = shared_state.sk.state.inmem.commit_lsn;
|
||||
|
||||
@@ -147,7 +147,7 @@ async fn publish(client: Option<BrokerClientChannel>, n_keys: u64) {
|
||||
http_connstr: "zenith-1-sk-1.local:7677".to_owned(),
|
||||
local_start_lsn: 0,
|
||||
availability_zone: None,
|
||||
standby_flush_lsn: 0,
|
||||
standby_horizon: 0,
|
||||
};
|
||||
counter += 1;
|
||||
yield info;
|
||||
|
||||
@@ -42,7 +42,7 @@ message SafekeeperTimelineInfo {
|
||||
uint64 remote_consistent_lsn = 7;
|
||||
uint64 peer_horizon_lsn = 8;
|
||||
uint64 local_start_lsn = 9;
|
||||
uint64 standby_flush_lsn = 14;
|
||||
uint64 standby_horizon = 14;
|
||||
// A connection string to use for WAL receiving.
|
||||
string safekeeper_connstr = 10;
|
||||
// HTTP endpoint connection string
|
||||
|
||||
@@ -734,7 +734,7 @@ mod tests {
|
||||
http_connstr: "neon-1-sk-1.local:7677".to_owned(),
|
||||
local_start_lsn: 0,
|
||||
availability_zone: None,
|
||||
standby_flush_lsn: 0,
|
||||
standby_horizon: 0,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user