diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index d1ecbc2ff3..e1b7c7d08c 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -654,9 +654,10 @@ pub struct WalRedoManagerStatus { pub enum PagestreamFeMessage { Exists(PagestreamExistsRequest), Nblocks(PagestreamNblocksRequest), - GetPage(PagestreamGetPageRequest), + GetLatestPage(PagestreamGetLatestPageRequest), // for compatinility with old clients DbSize(PagestreamDbSizeRequest), GetSlruSegment(PagestreamGetSlruSegmentRequest), + GetPage(PagestreamGetPageRequest), } // Wrapped in libpq CopyData @@ -709,6 +710,14 @@ pub struct PagestreamNblocksRequest { pub rel: RelTag, } +#[derive(Debug, PartialEq, Eq)] +pub struct PagestreamGetLatestPageRequest { + pub latest: bool, + pub lsn: Lsn, + pub rel: RelTag, + pub blkno: u32, +} + #[derive(Debug, PartialEq, Eq)] pub struct PagestreamGetPageRequest { pub horizon: Lsn, @@ -798,8 +807,19 @@ impl PagestreamFeMessage { bytes.put_u8(req.rel.forknum); } - Self::GetPage(req) => { + Self::GetLatestPage(req) => { bytes.put_u8(2); + bytes.put_u8(u8::from(req.latest)); + bytes.put_u64(req.lsn.0); + bytes.put_u32(req.rel.spcnode); + bytes.put_u32(req.rel.dbnode); + bytes.put_u32(req.rel.relnode); + bytes.put_u8(req.rel.forknum); + bytes.put_u32(req.blkno); + } + + Self::GetPage(req) => { + bytes.put_u8(4); bytes.put_u64(req.horizon.0); bytes.put_u64(req.lsn.0); bytes.put_u32(req.rel.spcnode); @@ -857,7 +877,25 @@ impl PagestreamFeMessage { forknum: body.read_u8()?, }, })), - 2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest { + 2 => Ok(PagestreamFeMessage::GetLatestPage( + PagestreamGetLatestPageRequest { + latest: body.read_u8()? != 0, + lsn: Lsn::from(body.read_u64::()?), + rel: RelTag { + spcnode: body.read_u32::()?, + dbnode: body.read_u32::()?, + relnode: body.read_u32::()?, + forknum: body.read_u8()?, + }, + blkno: body.read_u32::()?, + }, + )), + 3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest { + horizon: Lsn::from(body.read_u64::()?), + lsn: Lsn::from(body.read_u64::()?), + dbnode: body.read_u32::()?, + })), + 4 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest { horizon: Lsn::from(body.read_u64::()?), lsn: Lsn::from(body.read_u64::()?), rel: RelTag { @@ -868,12 +906,7 @@ impl PagestreamFeMessage { }, blkno: body.read_u32::()?, })), - 3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest { - horizon: Lsn::from(body.read_u64::()?), - lsn: Lsn::from(body.read_u64::()?), - dbnode: body.read_u32::()?, - })), - 4 => Ok(PagestreamFeMessage::GetSlruSegment( + 5 => Ok(PagestreamFeMessage::GetSlruSegment( PagestreamGetSlruSegmentRequest { latest: body.read_u8()? != 0, lsn: Lsn::from(body.read_u64::()?), diff --git a/libs/postgres_ffi/src/xlog_utils.rs b/libs/postgres_ffi/src/xlog_utils.rs index a863fad269..2aed128808 100644 --- a/libs/postgres_ffi/src/xlog_utils.rs +++ b/libs/postgres_ffi/src/xlog_utils.rs @@ -119,11 +119,6 @@ pub fn generate_pg_control( // Generate new pg_control needed for bootstrap checkpoint.redo = normalize_lsn(lsn, WAL_SEGMENT_SIZE).0; - //reset some fields we don't want to preserve - //TODO Check this. - //We may need to determine the value from twophase data. - checkpoint.oldestActiveXid = 0; - //save new values in pg_control pg_control.checkPoint = 0; pg_control.checkPointCopy = checkpoint; diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index a4756e34f5..6ce716d1d9 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -631,6 +631,25 @@ impl PageServerHandler { span, ) } + PagestreamFeMessage::GetLatestPage(old_req) => { + let req = PagestreamGetPageRequest { + horizon: if old_req.latest { + Lsn::MAX + } else { + old_req.lsn + }, + lsn: old_req.lsn, + rel: old_req.rel, + blkno: old_req.blkno, + }; + let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn); + ( + self.handle_get_page_at_lsn_request(tenant_id, timeline_id, &req, &ctx) + .instrument(span.clone()) + .await, + span, + ) + } PagestreamFeMessage::GetPage(req) => { // shard_id is filled in by the handler let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn); diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index a817a70543..6f77301943 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -4209,6 +4209,9 @@ impl Timeline { new_gc_cutoff }; + // Reset standby horizon to ignore it if it is not updated till next GC + self.standby_horizon.store(Lsn::INVALID); + let res = self .gc_timeline(horizon_cutoff, pitr_cutoff, retain_lsns, new_gc_cutoff) .instrument( diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index 7c31654902..10aaca9b76 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -553,9 +553,12 @@ impl ConnectionManagerState { fn register_timeline_update(&mut self, timeline_update: SafekeeperTimelineInfo) { WALRECEIVER_BROKER_UPDATES.inc(); - self.timeline - .standby_horizon - .store(Lsn(timeline_update.standby_horizon)); + if timeline_update.standby_horizon != 0 { + // ignore reports from safekeepers mnot connected to replicas + self.timeline + .standby_horizon + .store(Lsn(timeline_update.standby_horizon)); + } let new_safekeeper_id = NodeId(timeline_update.safekeeper_id); let old_entry = self.wal_stream_candidates.insert( diff --git a/pgxn/neon/pagestore_client.h b/pgxn/neon/pagestore_client.h index c64750a1c6..1e91eb8c99 100644 --- a/pgxn/neon/pagestore_client.h +++ b/pgxn/neon/pagestore_client.h @@ -33,9 +33,10 @@ typedef enum /* pagestore_client -> pagestore */ T_NeonExistsRequest = 0, T_NeonNblocksRequest, - T_NeonGetPageRequest, + T_NeonGetLatestPageRequest, /* old format of get_page command */ T_NeonDbSizeRequest, T_NeonGetSlruSegmentRequest, + T_NeonGetPageRequest, /* pagestore -> pagestore_client */ T_NeonExistsResponse = 100, diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index a2956799dc..33f5e598bc 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -37,8 +37,6 @@ const STANDBY_STATUS_UPDATE_TAG_BYTE: u8 = b'r'; // neon extension of replication protocol const NEON_STATUS_UPDATE_TAG_BYTE: u8 = b'z'; -const MAX_STANDBY_LAG: u64 = 1024 * 1024 * 1024; // 1Gb - type FullTransactionId = u64; /// Hot standby feedback received from replica @@ -270,27 +268,21 @@ impl WalSendersShared { agg.ts = max(agg.ts, hs_feedback.ts); } let reply = standby_feedback.reply; - if reply.write_lsn != Lsn::INVALID - && self.agg_ps_feedback.last_received_lsn < reply.write_lsn + MAX_STANDBY_LAG - { + if reply.write_lsn != Lsn::INVALID { if reply_agg.write_lsn != Lsn::INVALID { reply_agg.write_lsn = Lsn::min(reply_agg.write_lsn, reply.write_lsn); } else { reply_agg.write_lsn = reply.write_lsn; } } - if reply.flush_lsn != Lsn::INVALID - && self.agg_ps_feedback.last_received_lsn < reply.flush_lsn + MAX_STANDBY_LAG - { + if reply.flush_lsn != Lsn::INVALID { if reply_agg.flush_lsn != Lsn::INVALID { reply_agg.flush_lsn = Lsn::min(reply_agg.flush_lsn, reply.flush_lsn); } else { reply_agg.flush_lsn = reply.flush_lsn; } } - if reply.apply_lsn != Lsn::INVALID - && self.agg_ps_feedback.last_received_lsn < reply.apply_lsn + MAX_STANDBY_LAG - { + if reply.apply_lsn != Lsn::INVALID { if reply_agg.apply_lsn != Lsn::INVALID { reply_agg.apply_lsn = Lsn::min(reply_agg.apply_lsn, reply.apply_lsn); } else { diff --git a/test_runner/regress/test_replication_lag.py b/test_runner/regress/test_replication_lag.py index 714b3ce951..9a5cdafd63 100644 --- a/test_runner/regress/test_replication_lag.py +++ b/test_runner/regress/test_replication_lag.py @@ -7,12 +7,12 @@ from fixtures.neon_fixtures import NeonEnv, PgBin, wait_replica_caughtup def test_replication_lag(neon_simple_env: NeonEnv, pg_bin: PgBin): env = neon_simple_env - n_iterations = 10 + n_iterations = 60 # Use aggressive GC and checkpoint settings tenant, _ = env.neon_cli.create_tenant( conf={ - "gc_period": "5 s", + "gc_period": "15 s", # should not be smaller than wal_receiver_status_interval "gc_horizon": f"{1024 ** 2}", "checkpoint_distance": f"{1024 ** 2}", "compaction_target_size": f"{1024 ** 2}", @@ -23,7 +23,7 @@ def test_replication_lag(neon_simple_env: NeonEnv, pg_bin: PgBin): def run_pgbench(connstr: str): log.info(f"Start a pgbench workload on pg {connstr}") - pg_bin.run_capture(["pgbench", "-T30", connstr]) + pg_bin.run_capture(["pgbench", "-T60", connstr]) with env.endpoints.create_start( branch_name="main", endpoint_id="primary", tenant_id=tenant diff --git a/trace/src/main.rs b/trace/src/main.rs index 4605c124e9..3a57af57fa 100644 --- a/trace/src/main.rs +++ b/trace/src/main.rs @@ -7,7 +7,9 @@ use std::{ io::BufReader, }; -use pageserver_api::models::{PagestreamFeMessage, PagestreamGetPageRequest}; +use pageserver_api::models::{ + PagestreamFeMessage, PagestreamGetLatestPageRequest, PagestreamGetPageRequest, +}; use utils::id::{ConnectionId, TenantId, TimelineId}; use clap::{Parser, Subcommand}; @@ -51,9 +53,11 @@ enum Command { // - detect any prefetching anomalies by looking for negative deltas during seqscan fn analyze_trace(mut reader: R) { let mut total = 0; // Total requests traced + let mut old = 0; // Old requests traced let mut cross_rel = 0; // Requests that ask for different rel than previous request let mut deltas = HashMap::::new(); // Consecutive blkno differences let mut prev: Option = None; + let mut old_prev: Option = None; // Compute stats while let Ok(msg) = PagestreamFeMessage::parse(&mut reader) { @@ -61,6 +65,20 @@ fn analyze_trace(mut reader: R) { PagestreamFeMessage::Exists(_) => {} PagestreamFeMessage::Nblocks(_) => {} PagestreamFeMessage::GetSlruSegment(_) => {} + PagestreamFeMessage::GetLatestPage(req) => { + total += 1; + old += 1; + + if let Some(prev) = old_prev { + if prev.rel == req.rel { + let delta = (req.blkno as i32) - (prev.blkno as i32); + deltas.entry(delta).and_modify(|c| *c += 1).or_insert(1); + } else { + cross_rel += 1; + } + } + old_prev = Some(req); + } PagestreamFeMessage::GetPage(req) => { total += 1; @@ -83,6 +101,7 @@ fn analyze_trace(mut reader: R) { deltas.retain(|_, count| *count > 300); other -= deltas.len(); dbg!(total); + dbg!(old); dbg!(cross_rel); dbg!(other); dbg!(deltas); diff --git a/vendor/postgres-v14 b/vendor/postgres-v14 index be7a65fe67..23a19fe0a4 160000 --- a/vendor/postgres-v14 +++ b/vendor/postgres-v14 @@ -1 +1 @@ -Subproject commit be7a65fe67dc81d85bbcbebb13e00d94715f4b88 +Subproject commit 23a19fe0a4f6e56f4540afabe6825281c9d02cd1 diff --git a/vendor/postgres-v15 b/vendor/postgres-v15 index 81e16cd537..d4d552ce42 160000 --- a/vendor/postgres-v15 +++ b/vendor/postgres-v15 @@ -1 +1 @@ -Subproject commit 81e16cd537053f49e175d4a08ab7c8aec3d9b535 +Subproject commit d4d552ce42e40f29909f1c6613c57ecda6ea6933 diff --git a/vendor/postgres-v16 b/vendor/postgres-v16 index f7ea954989..0d4eb408a1 160000 --- a/vendor/postgres-v16 +++ b/vendor/postgres-v16 @@ -1 +1 @@ -Subproject commit f7ea954989a2e7901f858779cff55259f203479a +Subproject commit 0d4eb408a1242f8564ba2bfa57a1ea601dc99a4c diff --git a/vendor/revisions.json b/vendor/revisions.json index 80699839ba..e1dce85208 100644 --- a/vendor/revisions.json +++ b/vendor/revisions.json @@ -1,5 +1,5 @@ { - "postgres-v16": "f7ea954989a2e7901f858779cff55259f203479a", - "postgres-v15": "81e16cd537053f49e175d4a08ab7c8aec3d9b535", - "postgres-v14": "be7a65fe67dc81d85bbcbebb13e00d94715f4b88" + "postgres-v16": "70675be8c27bf861fee555eecd199f2e0f921d1b", + "postgres-v15": "d7d3a44f3219b189b161df030f4ca0229f0d06dc", + "postgres-v14": "08d37139bf30ca1167c64c3404074970cfe66f5a" }