Prevent flip-flop of standby_horizon by ignoring 0 LSN and resetting it after each GC iteration

This commit is contained in:
Konstantin Knizhnik
2024-01-30 11:10:40 +02:00
parent 22a0a6ba0c
commit f24b898bef
13 changed files with 104 additions and 39 deletions

View File

@@ -654,9 +654,10 @@ pub struct WalRedoManagerStatus {
pub enum PagestreamFeMessage {
Exists(PagestreamExistsRequest),
Nblocks(PagestreamNblocksRequest),
GetPage(PagestreamGetPageRequest),
GetLatestPage(PagestreamGetLatestPageRequest), // for compatinility with old clients
DbSize(PagestreamDbSizeRequest),
GetSlruSegment(PagestreamGetSlruSegmentRequest),
GetPage(PagestreamGetPageRequest),
}
// Wrapped in libpq CopyData
@@ -709,6 +710,14 @@ pub struct PagestreamNblocksRequest {
pub rel: RelTag,
}
#[derive(Debug, PartialEq, Eq)]
pub struct PagestreamGetLatestPageRequest {
pub latest: bool,
pub lsn: Lsn,
pub rel: RelTag,
pub blkno: u32,
}
#[derive(Debug, PartialEq, Eq)]
pub struct PagestreamGetPageRequest {
pub horizon: Lsn,
@@ -798,8 +807,19 @@ impl PagestreamFeMessage {
bytes.put_u8(req.rel.forknum);
}
Self::GetPage(req) => {
Self::GetLatestPage(req) => {
bytes.put_u8(2);
bytes.put_u8(u8::from(req.latest));
bytes.put_u64(req.lsn.0);
bytes.put_u32(req.rel.spcnode);
bytes.put_u32(req.rel.dbnode);
bytes.put_u32(req.rel.relnode);
bytes.put_u8(req.rel.forknum);
bytes.put_u32(req.blkno);
}
Self::GetPage(req) => {
bytes.put_u8(4);
bytes.put_u64(req.horizon.0);
bytes.put_u64(req.lsn.0);
bytes.put_u32(req.rel.spcnode);
@@ -857,7 +877,25 @@ impl PagestreamFeMessage {
forknum: body.read_u8()?,
},
})),
2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
2 => Ok(PagestreamFeMessage::GetLatestPage(
PagestreamGetLatestPageRequest {
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
rel: RelTag {
spcnode: body.read_u32::<BigEndian>()?,
dbnode: body.read_u32::<BigEndian>()?,
relnode: body.read_u32::<BigEndian>()?,
forknum: body.read_u8()?,
},
blkno: body.read_u32::<BigEndian>()?,
},
)),
3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
horizon: Lsn::from(body.read_u64::<BigEndian>()?),
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
dbnode: body.read_u32::<BigEndian>()?,
})),
4 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
horizon: Lsn::from(body.read_u64::<BigEndian>()?),
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
rel: RelTag {
@@ -868,12 +906,7 @@ impl PagestreamFeMessage {
},
blkno: body.read_u32::<BigEndian>()?,
})),
3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest {
horizon: Lsn::from(body.read_u64::<BigEndian>()?),
lsn: Lsn::from(body.read_u64::<BigEndian>()?),
dbnode: body.read_u32::<BigEndian>()?,
})),
4 => Ok(PagestreamFeMessage::GetSlruSegment(
5 => Ok(PagestreamFeMessage::GetSlruSegment(
PagestreamGetSlruSegmentRequest {
latest: body.read_u8()? != 0,
lsn: Lsn::from(body.read_u64::<BigEndian>()?),

View File

@@ -119,11 +119,6 @@ pub fn generate_pg_control(
// Generate new pg_control needed for bootstrap
checkpoint.redo = normalize_lsn(lsn, WAL_SEGMENT_SIZE).0;
//reset some fields we don't want to preserve
//TODO Check this.
//We may need to determine the value from twophase data.
checkpoint.oldestActiveXid = 0;
//save new values in pg_control
pg_control.checkPoint = 0;
pg_control.checkPointCopy = checkpoint;

View File

@@ -631,6 +631,25 @@ impl PageServerHandler {
span,
)
}
PagestreamFeMessage::GetLatestPage(old_req) => {
let req = PagestreamGetPageRequest {
horizon: if old_req.latest {
Lsn::MAX
} else {
old_req.lsn
},
lsn: old_req.lsn,
rel: old_req.rel,
blkno: old_req.blkno,
};
let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn);
(
self.handle_get_page_at_lsn_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
.await,
span,
)
}
PagestreamFeMessage::GetPage(req) => {
// shard_id is filled in by the handler
let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn);

View File

@@ -4209,6 +4209,9 @@ impl Timeline {
new_gc_cutoff
};
// Reset standby horizon to ignore it if it is not updated till next GC
self.standby_horizon.store(Lsn::INVALID);
let res = self
.gc_timeline(horizon_cutoff, pitr_cutoff, retain_lsns, new_gc_cutoff)
.instrument(

View File

@@ -553,9 +553,12 @@ impl ConnectionManagerState {
fn register_timeline_update(&mut self, timeline_update: SafekeeperTimelineInfo) {
WALRECEIVER_BROKER_UPDATES.inc();
self.timeline
.standby_horizon
.store(Lsn(timeline_update.standby_horizon));
if timeline_update.standby_horizon != 0 {
// ignore reports from safekeepers mnot connected to replicas
self.timeline
.standby_horizon
.store(Lsn(timeline_update.standby_horizon));
}
let new_safekeeper_id = NodeId(timeline_update.safekeeper_id);
let old_entry = self.wal_stream_candidates.insert(

View File

@@ -33,9 +33,10 @@ typedef enum
/* pagestore_client -> pagestore */
T_NeonExistsRequest = 0,
T_NeonNblocksRequest,
T_NeonGetPageRequest,
T_NeonGetLatestPageRequest, /* old format of get_page command */
T_NeonDbSizeRequest,
T_NeonGetSlruSegmentRequest,
T_NeonGetPageRequest,
/* pagestore -> pagestore_client */
T_NeonExistsResponse = 100,

View File

@@ -37,8 +37,6 @@ const STANDBY_STATUS_UPDATE_TAG_BYTE: u8 = b'r';
// neon extension of replication protocol
const NEON_STATUS_UPDATE_TAG_BYTE: u8 = b'z';
const MAX_STANDBY_LAG: u64 = 1024 * 1024 * 1024; // 1Gb
type FullTransactionId = u64;
/// Hot standby feedback received from replica
@@ -270,27 +268,21 @@ impl WalSendersShared {
agg.ts = max(agg.ts, hs_feedback.ts);
}
let reply = standby_feedback.reply;
if reply.write_lsn != Lsn::INVALID
&& self.agg_ps_feedback.last_received_lsn < reply.write_lsn + MAX_STANDBY_LAG
{
if reply.write_lsn != Lsn::INVALID {
if reply_agg.write_lsn != Lsn::INVALID {
reply_agg.write_lsn = Lsn::min(reply_agg.write_lsn, reply.write_lsn);
} else {
reply_agg.write_lsn = reply.write_lsn;
}
}
if reply.flush_lsn != Lsn::INVALID
&& self.agg_ps_feedback.last_received_lsn < reply.flush_lsn + MAX_STANDBY_LAG
{
if reply.flush_lsn != Lsn::INVALID {
if reply_agg.flush_lsn != Lsn::INVALID {
reply_agg.flush_lsn = Lsn::min(reply_agg.flush_lsn, reply.flush_lsn);
} else {
reply_agg.flush_lsn = reply.flush_lsn;
}
}
if reply.apply_lsn != Lsn::INVALID
&& self.agg_ps_feedback.last_received_lsn < reply.apply_lsn + MAX_STANDBY_LAG
{
if reply.apply_lsn != Lsn::INVALID {
if reply_agg.apply_lsn != Lsn::INVALID {
reply_agg.apply_lsn = Lsn::min(reply_agg.apply_lsn, reply.apply_lsn);
} else {

View File

@@ -7,12 +7,12 @@ from fixtures.neon_fixtures import NeonEnv, PgBin, wait_replica_caughtup
def test_replication_lag(neon_simple_env: NeonEnv, pg_bin: PgBin):
env = neon_simple_env
n_iterations = 10
n_iterations = 60
# Use aggressive GC and checkpoint settings
tenant, _ = env.neon_cli.create_tenant(
conf={
"gc_period": "5 s",
"gc_period": "15 s", # should not be smaller than wal_receiver_status_interval
"gc_horizon": f"{1024 ** 2}",
"checkpoint_distance": f"{1024 ** 2}",
"compaction_target_size": f"{1024 ** 2}",
@@ -23,7 +23,7 @@ def test_replication_lag(neon_simple_env: NeonEnv, pg_bin: PgBin):
def run_pgbench(connstr: str):
log.info(f"Start a pgbench workload on pg {connstr}")
pg_bin.run_capture(["pgbench", "-T30", connstr])
pg_bin.run_capture(["pgbench", "-T60", connstr])
with env.endpoints.create_start(
branch_name="main", endpoint_id="primary", tenant_id=tenant

View File

@@ -7,7 +7,9 @@ use std::{
io::BufReader,
};
use pageserver_api::models::{PagestreamFeMessage, PagestreamGetPageRequest};
use pageserver_api::models::{
PagestreamFeMessage, PagestreamGetLatestPageRequest, PagestreamGetPageRequest,
};
use utils::id::{ConnectionId, TenantId, TimelineId};
use clap::{Parser, Subcommand};
@@ -51,9 +53,11 @@ enum Command {
// - detect any prefetching anomalies by looking for negative deltas during seqscan
fn analyze_trace<R: std::io::Read>(mut reader: R) {
let mut total = 0; // Total requests traced
let mut old = 0; // Old requests traced
let mut cross_rel = 0; // Requests that ask for different rel than previous request
let mut deltas = HashMap::<i32, u32>::new(); // Consecutive blkno differences
let mut prev: Option<PagestreamGetPageRequest> = None;
let mut old_prev: Option<PagestreamGetLatestPageRequest> = None;
// Compute stats
while let Ok(msg) = PagestreamFeMessage::parse(&mut reader) {
@@ -61,6 +65,20 @@ fn analyze_trace<R: std::io::Read>(mut reader: R) {
PagestreamFeMessage::Exists(_) => {}
PagestreamFeMessage::Nblocks(_) => {}
PagestreamFeMessage::GetSlruSegment(_) => {}
PagestreamFeMessage::GetLatestPage(req) => {
total += 1;
old += 1;
if let Some(prev) = old_prev {
if prev.rel == req.rel {
let delta = (req.blkno as i32) - (prev.blkno as i32);
deltas.entry(delta).and_modify(|c| *c += 1).or_insert(1);
} else {
cross_rel += 1;
}
}
old_prev = Some(req);
}
PagestreamFeMessage::GetPage(req) => {
total += 1;
@@ -83,6 +101,7 @@ fn analyze_trace<R: std::io::Read>(mut reader: R) {
deltas.retain(|_, count| *count > 300);
other -= deltas.len();
dbg!(total);
dbg!(old);
dbg!(cross_rel);
dbg!(other);
dbg!(deltas);

View File

@@ -1,5 +1,5 @@
{
"postgres-v16": "f7ea954989a2e7901f858779cff55259f203479a",
"postgres-v15": "81e16cd537053f49e175d4a08ab7c8aec3d9b535",
"postgres-v14": "be7a65fe67dc81d85bbcbebb13e00d94715f4b88"
"postgres-v16": "70675be8c27bf861fee555eecd199f2e0f921d1b",
"postgres-v15": "d7d3a44f3219b189b161df030f4ca0229f0d06dc",
"postgres-v14": "08d37139bf30ca1167c64c3404074970cfe66f5a"
}