mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 18:32:56 +00:00
Compare commits
7 Commits
sk-basic-b
...
debug-chec
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
394e675db3 | ||
|
|
507c4bd7b4 | ||
|
|
caa3a42cdb | ||
|
|
be8a6bcdb4 | ||
|
|
71c7ba756d | ||
|
|
47f898710c | ||
|
|
cafd4b52ca |
@@ -1832,7 +1832,7 @@ const CONTROLFILE_KEY: Key = Key {
|
||||
field6: 0,
|
||||
};
|
||||
|
||||
const CHECKPOINT_KEY: Key = Key {
|
||||
pub const CHECKPOINT_KEY: Key = Key {
|
||||
field1: 0x03,
|
||||
field2: 0,
|
||||
field3: 0,
|
||||
|
||||
@@ -884,7 +884,7 @@ impl DeltaLayerInner {
|
||||
|
||||
let keys = self.load_keys(ctx).await?;
|
||||
|
||||
async fn dump_blob(val: ValueRef<'_>, ctx: &RequestContext) -> anyhow::Result<String> {
|
||||
async fn dump_blob(val: &ValueRef<'_>, ctx: &RequestContext) -> anyhow::Result<String> {
|
||||
let buf = val.reader.read_blob(val.blob_ref.pos(), ctx).await?;
|
||||
let val = Value::des(&buf)?;
|
||||
let desc = match val {
|
||||
@@ -905,14 +905,30 @@ impl DeltaLayerInner {
|
||||
}
|
||||
|
||||
for entry in keys {
|
||||
let DeltaEntry { key, lsn, val, .. } = entry;
|
||||
let desc = match dump_blob(val, ctx).await {
|
||||
let DeltaEntry { key, lsn, val, .. } = entry;
|
||||
let desc = match dump_blob(&val, ctx).await {
|
||||
Ok(desc) => desc,
|
||||
Err(err) => {
|
||||
format!("ERROR: {err}")
|
||||
}
|
||||
};
|
||||
println!(" key {key} at {lsn}: {desc}");
|
||||
use crate::pgdatadir_mapping::CHECKPOINT_KEY;
|
||||
use postgres_ffi::CheckPoint;
|
||||
if key == CHECKPOINT_KEY
|
||||
{
|
||||
let buf = val.reader.read_blob(val.blob_ref.pos(), ctx).await?;
|
||||
let val = Value::des(&buf)?;
|
||||
match val {
|
||||
Value::Image(img) => {
|
||||
let checkpoint = CheckPoint::decode(&img)?;
|
||||
println!(" CHECKPOINT: {:?}", checkpoint);
|
||||
}
|
||||
Value::WalRecord(_rec) => {
|
||||
format!(" unexpected walrecord for checkpoint key");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -26,8 +26,11 @@ use postgres_ffi::v14::nonrelfile_utils::clogpage_precedes;
|
||||
use postgres_ffi::v14::nonrelfile_utils::slru_may_delete_clogsegment;
|
||||
use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn};
|
||||
|
||||
use std::str::FromStr;
|
||||
|
||||
use anyhow::{bail, Context, Result};
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
use hex::FromHex;
|
||||
use tracing::*;
|
||||
use utils::failpoint_support;
|
||||
|
||||
@@ -43,9 +46,10 @@ use postgres_ffi::pg_constants;
|
||||
use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
|
||||
use postgres_ffi::v14::nonrelfile_utils::mx_offset_to_member_segment;
|
||||
use postgres_ffi::v14::xlog_utils::*;
|
||||
use postgres_ffi::v14::CheckPoint;
|
||||
use postgres_ffi::v14::{bindings::FullTransactionId, CheckPoint};
|
||||
use postgres_ffi::TransactionId;
|
||||
use postgres_ffi::BLCKSZ;
|
||||
use utils::id::TenantId;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
pub struct WalIngest {
|
||||
@@ -108,6 +112,55 @@ impl WalIngest {
|
||||
self.checkpoint_modified = true;
|
||||
}
|
||||
|
||||
// BEGIN ONE-OFF HACK
|
||||
//
|
||||
// We had a bug where we incorrectly passed 0 to update_next_xid(). That was
|
||||
// harmless as long as nextXid was < 2^31, because 0 looked like a very old
|
||||
// XID. But once nextXid reaches 2^31, 0 starts to look like a very new XID, and
|
||||
// we incorrectly bumped up nextXid to the next epoch, to value '1:1024'
|
||||
//
|
||||
// We have one known timeline in production where that happened. This is a one-off
|
||||
// fix to fix that damage. The last WAL record on that timeline as of this writing
|
||||
// is this:
|
||||
//
|
||||
// rmgr: Standby len (rec/tot): 50/ 50, tx: 0, lsn: 35A/E32D86D8, prev 35A/E32D86B0, desc: RUNNING_XACTS nextXid 2325447052 latestCompletedXid 2325447051 oldestRunningXid 2325447052
|
||||
//
|
||||
// So on that particular timeline, before that LSN, fix the incorrectly set
|
||||
// nextXid to the nextXid value from that record, plus 1000 to give some safety
|
||||
// margin.
|
||||
|
||||
// For testing this hack, this failpoint temporarily re-introduces the bug that
|
||||
// was fixed
|
||||
fn reintroduce_bug_failpoint_activated() -> bool {
|
||||
fail::fail_point!("reintroduce-nextxid-update-bug", |_| { true });
|
||||
false
|
||||
}
|
||||
if decoded.xl_xid == pg_constants::INVALID_TRANSACTION_ID
|
||||
&& reintroduce_bug_failpoint_activated()
|
||||
&& self.checkpoint.update_next_xid(decoded.xl_xid)
|
||||
{
|
||||
info!(
|
||||
"failpoint: Incorrectly updated nextXid at LSN {} to {}",
|
||||
lsn, self.checkpoint.nextXid.value
|
||||
);
|
||||
self.checkpoint_modified = true;
|
||||
}
|
||||
|
||||
if self.checkpoint.nextXid.value == 4294968320 && // 1::1024, the incorrect value
|
||||
modification.tline.tenant_shard_id.tenant_id == TenantId::from_hex("df254570a4f603805528b46b0d45a76c").unwrap() &&
|
||||
lsn < Lsn::from_str("35A/E32D9000").unwrap() &&
|
||||
!reintroduce_bug_failpoint_activated()
|
||||
{
|
||||
// This is the last nextXid value from the last RUNNING_XACTS record, at the
|
||||
// end of the WAL as of this writing.
|
||||
self.checkpoint.nextXid = FullTransactionId {
|
||||
value: 2325447052 + 1000,
|
||||
};
|
||||
self.checkpoint_modified = true;
|
||||
warn!("nextXid fixed by one-off hack at LSN {}", lsn);
|
||||
}
|
||||
// END ONE-OFF HACK
|
||||
|
||||
match decoded.xl_rmid {
|
||||
pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => {
|
||||
// Heap AM records need some special handling, because they modify VM pages
|
||||
|
||||
@@ -2998,6 +2998,23 @@ class Endpoint(PgProtocol):
|
||||
):
|
||||
self.stop()
|
||||
|
||||
def log_contains(self, pattern: str) -> Optional[str]:
|
||||
"""Check that the compute log contains a line that matches the given regex"""
|
||||
logfile = self.endpoint_path() / "compute.log"
|
||||
if not logfile.exists():
|
||||
log.warning(f"Skipping log check: {logfile} does not exist")
|
||||
return None
|
||||
|
||||
contains_re = re.compile(pattern)
|
||||
|
||||
with logfile.open("r") as f:
|
||||
for line in f:
|
||||
if contains_re.search(line):
|
||||
# found it!
|
||||
return line
|
||||
|
||||
return None
|
||||
|
||||
# Checkpoints running endpoint and returns pg_wal size in MB.
|
||||
def get_pg_wal_size(self):
|
||||
log.info(f'checkpointing at LSN {self.safe_psql("select pg_current_wal_lsn()")[0][0]}')
|
||||
|
||||
@@ -3,10 +3,12 @@ import os
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, wait_for_wal_insert_lsn
|
||||
from fixtures.pageserver.utils import (
|
||||
wait_for_last_record_lsn,
|
||||
wait_for_upload,
|
||||
)
|
||||
from fixtures.remote_storage import RemoteStorageKind
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
@@ -98,7 +100,7 @@ def test_import_at_2bil(
|
||||
vanilla_pg.safe_psql("CREATE TABLE t (t text);")
|
||||
vanilla_pg.safe_psql("INSERT INTO t VALUES ('inserted in vanilla')")
|
||||
|
||||
endpoint_id = "ep-import_from_vanilla"
|
||||
branch_name = "import_from_vanilla"
|
||||
tenant = TenantId.generate()
|
||||
timeline = TimelineId.generate()
|
||||
|
||||
@@ -138,7 +140,7 @@ def test_import_at_2bil(
|
||||
"--timeline-id",
|
||||
str(timeline),
|
||||
"--node-name",
|
||||
endpoint_id,
|
||||
branch_name,
|
||||
"--base-lsn",
|
||||
start_lsn,
|
||||
"--base-tarfile",
|
||||
@@ -157,7 +159,8 @@ def test_import_at_2bil(
|
||||
wait_for_last_record_lsn(ps_http, tenant, timeline, Lsn(end_lsn))
|
||||
|
||||
endpoint = env.endpoints.create_start(
|
||||
endpoint_id,
|
||||
branch_name,
|
||||
endpoint_id="ep-import_from_vanilla",
|
||||
tenant_id=tenant,
|
||||
config_lines=[
|
||||
"log_autovacuum_min_duration = 0",
|
||||
@@ -166,7 +169,6 @@ def test_import_at_2bil(
|
||||
)
|
||||
assert endpoint.safe_psql("select count(*) from t") == [(1,)]
|
||||
|
||||
# Ok, consume
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
|
||||
@@ -218,3 +220,212 @@ def test_import_at_2bil(
|
||||
cur = conn.cursor()
|
||||
cur.execute("SELECT count(*) from t")
|
||||
assert cur.fetchone() == (10000 + 1,)
|
||||
|
||||
|
||||
# This is a followup to the test_import_at_2bil test.
|
||||
#
|
||||
# Use a failpoint to reintroduce the bug that test_import_at_2bil also
|
||||
# tests. Then, after the damage has been done, clear the failpoint to
|
||||
# fix the bug. Check that the one-off hack that we added for a particular
|
||||
# timeline that hit this in production fixes the broken timeline.
|
||||
def test_one_off_hack_for_nextxid_bug(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
test_output_dir: Path,
|
||||
pg_distrib_dir: Path,
|
||||
pg_bin,
|
||||
vanilla_pg,
|
||||
):
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
env = neon_env_builder.init_start()
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
env.pageserver.allowed_errors.append(".*nextXid fixed by one-off hack.*")
|
||||
|
||||
# We begin with the old bug still present, to create a broken timeline
|
||||
ps_http.configure_failpoints(("reintroduce-nextxid-update-bug", "return(true)"))
|
||||
|
||||
# Set LD_LIBRARY_PATH in the env properly, otherwise we may use the wrong libpq.
|
||||
# PgBin sets it automatically, but here we need to pipe psql output to the tar command.
|
||||
psql_env = {"LD_LIBRARY_PATH": str(pg_distrib_dir / "lib")}
|
||||
|
||||
# Reset the vanilla Postgres instance to somewhat before 2 billion transactions,
|
||||
# and around the same LSN as with the production timeline.
|
||||
pg_resetwal_path = os.path.join(pg_bin.pg_bin_path, "pg_resetwal")
|
||||
cmd = [
|
||||
pg_resetwal_path,
|
||||
"--next-transaction-id=2129920000",
|
||||
"-l",
|
||||
"000000010000035A000000E0",
|
||||
"-D",
|
||||
str(vanilla_pg.pgdatadir),
|
||||
]
|
||||
pg_bin.run_capture(cmd, env=psql_env)
|
||||
|
||||
vanilla_pg.start()
|
||||
vanilla_pg.safe_psql("create user cloud_admin with password 'postgres' superuser")
|
||||
vanilla_pg.safe_psql(
|
||||
"""create table tt as select 'long string to consume some space' || g
|
||||
from generate_series(1,300000) g"""
|
||||
)
|
||||
assert vanilla_pg.safe_psql("select count(*) from tt") == [(300000,)]
|
||||
vanilla_pg.safe_psql("CREATE TABLE t (t text);")
|
||||
vanilla_pg.safe_psql("INSERT INTO t VALUES ('inserted in vanilla')")
|
||||
|
||||
branch_name = "import_from_vanilla"
|
||||
# This is the tenant/timeline that the one-off hack targets
|
||||
tenant = "df254570a4f603805528b46b0d45a76c"
|
||||
timeline = TimelineId.generate()
|
||||
|
||||
env.pageserver.tenant_create(tenant)
|
||||
|
||||
# Take basebackup
|
||||
basebackup_dir = os.path.join(test_output_dir, "basebackup")
|
||||
base_tar = os.path.join(basebackup_dir, "base.tar")
|
||||
wal_tar = os.path.join(basebackup_dir, "pg_wal.tar")
|
||||
os.mkdir(basebackup_dir)
|
||||
vanilla_pg.safe_psql("CHECKPOINT")
|
||||
pg_bin.run(
|
||||
[
|
||||
"pg_basebackup",
|
||||
"-F",
|
||||
"tar",
|
||||
"-d",
|
||||
vanilla_pg.connstr(),
|
||||
"-D",
|
||||
basebackup_dir,
|
||||
]
|
||||
)
|
||||
|
||||
# Get start_lsn and end_lsn
|
||||
with open(os.path.join(basebackup_dir, "backup_manifest")) as f:
|
||||
manifest = json.load(f)
|
||||
start_lsn = manifest["WAL-Ranges"][0]["Start-LSN"]
|
||||
end_lsn = manifest["WAL-Ranges"][0]["End-LSN"]
|
||||
|
||||
def import_tar(base, wal):
|
||||
env.neon_cli.raw_cli(
|
||||
[
|
||||
"timeline",
|
||||
"import",
|
||||
"--tenant-id",
|
||||
str(tenant),
|
||||
"--timeline-id",
|
||||
str(timeline),
|
||||
"--node-name",
|
||||
branch_name,
|
||||
"--base-lsn",
|
||||
start_lsn,
|
||||
"--base-tarfile",
|
||||
base,
|
||||
"--end-lsn",
|
||||
end_lsn,
|
||||
"--wal-tarfile",
|
||||
wal,
|
||||
"--pg-version",
|
||||
env.pg_version,
|
||||
]
|
||||
)
|
||||
|
||||
# Importing correct backup works
|
||||
import_tar(base_tar, wal_tar)
|
||||
wait_for_last_record_lsn(ps_http, tenant, timeline, Lsn(end_lsn))
|
||||
|
||||
endpoint = env.endpoints.create_start(
|
||||
branch_name,
|
||||
endpoint_id="ep-import_from_vanilla",
|
||||
tenant_id=tenant,
|
||||
config_lines=[
|
||||
"log_autovacuum_min_duration = 0",
|
||||
"autovacuum_naptime='5 s'",
|
||||
],
|
||||
)
|
||||
assert endpoint.safe_psql("select count(*) from t") == [(1,)]
|
||||
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
|
||||
# Install extension containing function needed for test
|
||||
cur.execute("CREATE EXTENSION neon_test_utils")
|
||||
|
||||
# Advance nextXid to the target XID, which is somewhat above the 2
|
||||
# billion mark.
|
||||
while True:
|
||||
xid = int(query_scalar(cur, "SELECT txid_current()"))
|
||||
log.info(f"xid now {xid}")
|
||||
# Consume 10k transactons at a time until we get to 2^31 - 200k
|
||||
if xid < (2325447052 - 100000):
|
||||
cur.execute("select test_consume_xids(50000);")
|
||||
elif xid < 2325447052 - 10000:
|
||||
cur.execute("select test_consume_xids(5000);")
|
||||
else:
|
||||
break
|
||||
|
||||
# Run a bunch of real INSERTs to cross over the 2 billion mark
|
||||
# Use a begin-exception block to have a separate sub-XID for each insert.
|
||||
cur.execute(
|
||||
"""
|
||||
do $$
|
||||
begin
|
||||
for i in 1..10000 loop
|
||||
-- Use a begin-exception block to generate a new subtransaction on each iteration
|
||||
begin
|
||||
insert into t values (i);
|
||||
exception when others then
|
||||
raise 'not expected %', sqlerrm;
|
||||
end;
|
||||
end loop;
|
||||
end;
|
||||
$$;
|
||||
"""
|
||||
)
|
||||
# A checkpoint writes a WAL record with xl_xid=0. Many other WAL
|
||||
# records would have the same effect.
|
||||
cur.execute("checkpoint")
|
||||
|
||||
# Ok, the nextXid in the pageserver at this LSN should now be incorrectly
|
||||
# set to 1:1024. Remember this LSN.
|
||||
broken_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_insert_lsn()"))
|
||||
|
||||
# Ensure that the broken checkpoint data has reached permanent storage
|
||||
ps_http.timeline_checkpoint(tenant, timeline)
|
||||
wait_for_upload(ps_http, tenant, timeline, broken_lsn)
|
||||
|
||||
# Now fix the bug, and generate some WAL with XIDs
|
||||
ps_http.configure_failpoints(("reintroduce-nextxid-update-bug", "off"))
|
||||
cur.execute("INSERT INTO t VALUES ('after fix')")
|
||||
fixed_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_insert_lsn()"))
|
||||
|
||||
log.info(f"nextXid was broken by {broken_lsn}, and fixed again by {fixed_lsn}")
|
||||
|
||||
# Stop the original endpoint, we don't need it anymore.
|
||||
endpoint.stop()
|
||||
|
||||
# Test that we cannot start a new endpoint at the broken LSN.
|
||||
env.neon_cli.create_branch(
|
||||
"at-broken-lsn", branch_name, ancestor_start_lsn=broken_lsn, tenant_id=tenant
|
||||
)
|
||||
endpoint_broken = env.endpoints.create(
|
||||
"at-broken-lsn",
|
||||
endpoint_id="ep-at-broken-lsn",
|
||||
tenant_id=tenant,
|
||||
)
|
||||
with pytest.raises(RuntimeError, match="Postgres exited unexpectedly with code 1"):
|
||||
endpoint_broken.start()
|
||||
assert endpoint_broken.log_contains(
|
||||
'Could not open file "pg_xact/0000": No such file or directory'
|
||||
)
|
||||
|
||||
# But after the bug was fixed, the one-off hack fixed the timeline,
|
||||
# and a later LSN works.
|
||||
env.neon_cli.create_branch(
|
||||
"at-fixed-lsn", branch_name, ancestor_start_lsn=fixed_lsn, tenant_id=tenant
|
||||
)
|
||||
endpoint_fixed = env.endpoints.create_start(
|
||||
"at-fixed-lsn", endpoint_id="ep-at-fixed-lsn", tenant_id=tenant
|
||||
)
|
||||
|
||||
conn = endpoint_fixed.connect()
|
||||
cur = conn.cursor()
|
||||
cur.execute("SELECT count(*) from t")
|
||||
# One "inserted in vanilla" row, 10000 in the DO-loop, and one "after fix" row
|
||||
assert cur.fetchone() == (1 + 10000 + 1,)
|
||||
|
||||
Reference in New Issue
Block a user