diff --git a/pageserver/src/walreceiver/walreceiver_connection.rs b/pageserver/src/walreceiver/walreceiver_connection.rs index ef5baeb570..a4a6af455c 100644 --- a/pageserver/src/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/walreceiver/walreceiver_connection.rs @@ -12,6 +12,8 @@ use chrono::{NaiveDateTime, Utc}; use fail::fail_point; use futures::StreamExt; use postgres::{SimpleQueryMessage, SimpleQueryRow}; +use postgres_ffi::v14::xlog_utils::normalize_lsn; +use postgres_ffi::WAL_SEGMENT_SIZE; use postgres_protocol::message::backend::ReplicationMessage; use postgres_types::PgLsn; use tokio::{pin, select, sync::watch, time}; @@ -156,6 +158,14 @@ pub async fn handle_walreceiver_connection( // There might be some padding after the last full record, skip it. startpoint += startpoint.calc_padding(8u32); + // If the starting point is at a WAL page boundary, skip past the page header. We don't need the page headers + // for anything, and in some corner cases, the compute node might have never generated the WAL for page headers + //. That happens if you create a branch at page boundary: the start point of the branch is at the page boundary, + // but when the compute node first starts on the branch, we normalize the first REDO position to just after the page + // header (see generate_pg_control()), so the WAL for the page header is never streamed from the compute node + // to the safekeepers. + startpoint = normalize_lsn(startpoint, WAL_SEGMENT_SIZE); + info!("last_record_lsn {last_rec_lsn} starting replication from {startpoint}, safekeeper is at {end_of_wal}..."); let query = format!("START_REPLICATION PHYSICAL {startpoint}"); diff --git a/test_runner/regress/test_branching.py b/test_runner/regress/test_branching.py index 0c1490294d..3b78700e9f 100644 --- a/test_runner/regress/test_branching.py +++ b/test_runner/regress/test_branching.py @@ -6,6 +6,8 @@ from typing import List import pytest from fixtures.log_helper import log from fixtures.neon_fixtures import NeonEnv, PgBin, Postgres +from fixtures.types import Lsn +from fixtures.utils import query_scalar from performance.test_perf_pgbench import get_scales_matrix @@ -88,3 +90,39 @@ def test_branching_with_pgbench( for pg in pgs: res = pg.safe_psql("SELECT count(*) from pgbench_accounts") assert res[0] == (100000 * scale,) + + +# Test branching from an "unnormalized" LSN. +# +# Context: +# When doing basebackup for a newly created branch, pageserver generates +# 'pg_control' file to bootstrap WAL segment by specifying the redo position +# a "normalized" LSN based on the timeline's starting LSN: +# +# checkpoint.redo = normalize_lsn(self.lsn, pg_constants::WAL_SEGMENT_SIZE).0; +# +# This test checks if the pageserver is able to handle a "unnormalized" starting LSN. +# +# Related: see discussion in https://github.com/neondatabase/neon/pull/2143#issuecomment-1209092186 +def test_branching_unnormalized_start_lsn(neon_simple_env: NeonEnv, pg_bin: PgBin): + XLOG_BLCKSZ = 8192 + + env = neon_simple_env + + env.neon_cli.create_branch("b0") + pg0 = env.postgres.create_start("b0") + + pg_bin.run_capture(["pgbench", "-i", pg0.connstr()]) + + with pg0.cursor() as cur: + curr_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()")) + + # Specify the `start_lsn` as a number that is divided by `XLOG_BLCKSZ` + # and is smaller than `curr_lsn`. + start_lsn = Lsn((int(curr_lsn) - XLOG_BLCKSZ) // XLOG_BLCKSZ * XLOG_BLCKSZ) + + log.info(f"Branching b1 from b0 starting at lsn {start_lsn}...") + env.neon_cli.create_branch("b1", "b0", ancestor_start_lsn=start_lsn) + pg1 = env.postgres.create_start("b1") + + pg_bin.run_capture(["pgbench", "-i", pg1.connstr()])