mirror of
https://github.com/neondatabase/neon.git
synced 2026-03-04 17:00:37 +00:00
Compare commits
1 Commits
debug-chec
...
sk-patch-c
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ee46acee26 |
@@ -1,2 +1,2 @@
|
|||||||
[profile.default]
|
[profile.default]
|
||||||
slow-timeout = { period = "20s", terminate-after = 3 }
|
slow-timeout = "1m"
|
||||||
|
|||||||
@@ -1832,7 +1832,7 @@ const CONTROLFILE_KEY: Key = Key {
|
|||||||
field6: 0,
|
field6: 0,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub const CHECKPOINT_KEY: Key = Key {
|
const CHECKPOINT_KEY: Key = Key {
|
||||||
field1: 0x03,
|
field1: 0x03,
|
||||||
field2: 0,
|
field2: 0,
|
||||||
field3: 0,
|
field3: 0,
|
||||||
|
|||||||
@@ -884,7 +884,7 @@ impl DeltaLayerInner {
|
|||||||
|
|
||||||
let keys = self.load_keys(ctx).await?;
|
let keys = self.load_keys(ctx).await?;
|
||||||
|
|
||||||
async fn dump_blob(val: &ValueRef<'_>, ctx: &RequestContext) -> anyhow::Result<String> {
|
async fn dump_blob(val: ValueRef<'_>, ctx: &RequestContext) -> anyhow::Result<String> {
|
||||||
let buf = val.reader.read_blob(val.blob_ref.pos(), ctx).await?;
|
let buf = val.reader.read_blob(val.blob_ref.pos(), ctx).await?;
|
||||||
let val = Value::des(&buf)?;
|
let val = Value::des(&buf)?;
|
||||||
let desc = match val {
|
let desc = match val {
|
||||||
@@ -905,30 +905,14 @@ impl DeltaLayerInner {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for entry in keys {
|
for entry in keys {
|
||||||
let DeltaEntry { key, lsn, val, .. } = entry;
|
let DeltaEntry { key, lsn, val, .. } = entry;
|
||||||
let desc = match dump_blob(&val, ctx).await {
|
let desc = match dump_blob(val, ctx).await {
|
||||||
Ok(desc) => desc,
|
Ok(desc) => desc,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
format!("ERROR: {err}")
|
format!("ERROR: {err}")
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
println!(" key {key} at {lsn}: {desc}");
|
println!(" key {key} at {lsn}: {desc}");
|
||||||
use crate::pgdatadir_mapping::CHECKPOINT_KEY;
|
|
||||||
use postgres_ffi::CheckPoint;
|
|
||||||
if key == CHECKPOINT_KEY
|
|
||||||
{
|
|
||||||
let buf = val.reader.read_blob(val.blob_ref.pos(), ctx).await?;
|
|
||||||
let val = Value::des(&buf)?;
|
|
||||||
match val {
|
|
||||||
Value::Image(img) => {
|
|
||||||
let checkpoint = CheckPoint::decode(&img)?;
|
|
||||||
println!(" CHECKPOINT: {:?}", checkpoint);
|
|
||||||
}
|
|
||||||
Value::WalRecord(_rec) => {
|
|
||||||
format!(" unexpected walrecord for checkpoint key");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|||||||
@@ -26,11 +26,8 @@ use postgres_ffi::v14::nonrelfile_utils::clogpage_precedes;
|
|||||||
use postgres_ffi::v14::nonrelfile_utils::slru_may_delete_clogsegment;
|
use postgres_ffi::v14::nonrelfile_utils::slru_may_delete_clogsegment;
|
||||||
use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn};
|
use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn};
|
||||||
|
|
||||||
use std::str::FromStr;
|
|
||||||
|
|
||||||
use anyhow::{bail, Context, Result};
|
use anyhow::{bail, Context, Result};
|
||||||
use bytes::{Buf, Bytes, BytesMut};
|
use bytes::{Buf, Bytes, BytesMut};
|
||||||
use hex::FromHex;
|
|
||||||
use tracing::*;
|
use tracing::*;
|
||||||
use utils::failpoint_support;
|
use utils::failpoint_support;
|
||||||
|
|
||||||
@@ -46,10 +43,9 @@ use postgres_ffi::pg_constants;
|
|||||||
use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
|
use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
|
||||||
use postgres_ffi::v14::nonrelfile_utils::mx_offset_to_member_segment;
|
use postgres_ffi::v14::nonrelfile_utils::mx_offset_to_member_segment;
|
||||||
use postgres_ffi::v14::xlog_utils::*;
|
use postgres_ffi::v14::xlog_utils::*;
|
||||||
use postgres_ffi::v14::{bindings::FullTransactionId, CheckPoint};
|
use postgres_ffi::v14::CheckPoint;
|
||||||
use postgres_ffi::TransactionId;
|
use postgres_ffi::TransactionId;
|
||||||
use postgres_ffi::BLCKSZ;
|
use postgres_ffi::BLCKSZ;
|
||||||
use utils::id::TenantId;
|
|
||||||
use utils::lsn::Lsn;
|
use utils::lsn::Lsn;
|
||||||
|
|
||||||
pub struct WalIngest {
|
pub struct WalIngest {
|
||||||
@@ -106,61 +102,10 @@ impl WalIngest {
|
|||||||
buf.advance(decoded.main_data_offset);
|
buf.advance(decoded.main_data_offset);
|
||||||
|
|
||||||
assert!(!self.checkpoint_modified);
|
assert!(!self.checkpoint_modified);
|
||||||
if decoded.xl_xid != pg_constants::INVALID_TRANSACTION_ID
|
if self.checkpoint.update_next_xid(decoded.xl_xid) {
|
||||||
&& self.checkpoint.update_next_xid(decoded.xl_xid)
|
|
||||||
{
|
|
||||||
self.checkpoint_modified = true;
|
self.checkpoint_modified = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
// BEGIN ONE-OFF HACK
|
|
||||||
//
|
|
||||||
// We had a bug where we incorrectly passed 0 to update_next_xid(). That was
|
|
||||||
// harmless as long as nextXid was < 2^31, because 0 looked like a very old
|
|
||||||
// XID. But once nextXid reaches 2^31, 0 starts to look like a very new XID, and
|
|
||||||
// we incorrectly bumped up nextXid to the next epoch, to value '1:1024'
|
|
||||||
//
|
|
||||||
// We have one known timeline in production where that happened. This is a one-off
|
|
||||||
// fix to fix that damage. The last WAL record on that timeline as of this writing
|
|
||||||
// is this:
|
|
||||||
//
|
|
||||||
// rmgr: Standby len (rec/tot): 50/ 50, tx: 0, lsn: 35A/E32D86D8, prev 35A/E32D86B0, desc: RUNNING_XACTS nextXid 2325447052 latestCompletedXid 2325447051 oldestRunningXid 2325447052
|
|
||||||
//
|
|
||||||
// So on that particular timeline, before that LSN, fix the incorrectly set
|
|
||||||
// nextXid to the nextXid value from that record, plus 1000 to give some safety
|
|
||||||
// margin.
|
|
||||||
|
|
||||||
// For testing this hack, this failpoint temporarily re-introduces the bug that
|
|
||||||
// was fixed
|
|
||||||
fn reintroduce_bug_failpoint_activated() -> bool {
|
|
||||||
fail::fail_point!("reintroduce-nextxid-update-bug", |_| { true });
|
|
||||||
false
|
|
||||||
}
|
|
||||||
if decoded.xl_xid == pg_constants::INVALID_TRANSACTION_ID
|
|
||||||
&& reintroduce_bug_failpoint_activated()
|
|
||||||
&& self.checkpoint.update_next_xid(decoded.xl_xid)
|
|
||||||
{
|
|
||||||
info!(
|
|
||||||
"failpoint: Incorrectly updated nextXid at LSN {} to {}",
|
|
||||||
lsn, self.checkpoint.nextXid.value
|
|
||||||
);
|
|
||||||
self.checkpoint_modified = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
if self.checkpoint.nextXid.value == 4294968320 && // 1::1024, the incorrect value
|
|
||||||
modification.tline.tenant_shard_id.tenant_id == TenantId::from_hex("df254570a4f603805528b46b0d45a76c").unwrap() &&
|
|
||||||
lsn < Lsn::from_str("35A/E32D9000").unwrap() &&
|
|
||||||
!reintroduce_bug_failpoint_activated()
|
|
||||||
{
|
|
||||||
// This is the last nextXid value from the last RUNNING_XACTS record, at the
|
|
||||||
// end of the WAL as of this writing.
|
|
||||||
self.checkpoint.nextXid = FullTransactionId {
|
|
||||||
value: 2325447052 + 1000,
|
|
||||||
};
|
|
||||||
self.checkpoint_modified = true;
|
|
||||||
warn!("nextXid fixed by one-off hack at LSN {}", lsn);
|
|
||||||
}
|
|
||||||
// END ONE-OFF HACK
|
|
||||||
|
|
||||||
match decoded.xl_rmid {
|
match decoded.xl_rmid {
|
||||||
pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => {
|
pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => {
|
||||||
// Heap AM records need some special handling, because they modify VM pages
|
// Heap AM records need some special handling, because they modify VM pages
|
||||||
@@ -385,13 +330,8 @@ impl WalIngest {
|
|||||||
< 0
|
< 0
|
||||||
{
|
{
|
||||||
self.checkpoint.oldestXid = xlog_checkpoint.oldestXid;
|
self.checkpoint.oldestXid = xlog_checkpoint.oldestXid;
|
||||||
|
self.checkpoint_modified = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write a new checkpoint key-value pair on every checkpoint record, even
|
|
||||||
// if nothing really changed. Not strictly required, but it seems nice to
|
|
||||||
// have some trace of the checkpoint records in the layer files at the same
|
|
||||||
// LSNs.
|
|
||||||
self.checkpoint_modified = true;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pg_constants::RM_LOGICALMSG_ID => {
|
pg_constants::RM_LOGICALMSG_ID => {
|
||||||
|
|||||||
@@ -179,18 +179,17 @@ impl super::Api for Api {
|
|||||||
return Ok(Some(role_secret));
|
return Ok(Some(role_secret));
|
||||||
}
|
}
|
||||||
let auth_info = self.do_get_auth_info(ctx, user_info).await?;
|
let auth_info = self.do_get_auth_info(ctx, user_info).await?;
|
||||||
if let Some(project_id) = auth_info.project_id {
|
let project_id = auth_info.project_id.unwrap_or(ep.clone());
|
||||||
if let Some(secret) = &auth_info.secret {
|
if let Some(secret) = &auth_info.secret {
|
||||||
self.caches
|
self.caches
|
||||||
.project_info
|
.project_info
|
||||||
.insert_role_secret(&project_id, ep, user, secret.clone())
|
.insert_role_secret(&project_id, ep, user, secret.clone())
|
||||||
}
|
|
||||||
self.caches.project_info.insert_allowed_ips(
|
|
||||||
&project_id,
|
|
||||||
ep,
|
|
||||||
Arc::new(auth_info.allowed_ips),
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
self.caches.project_info.insert_allowed_ips(
|
||||||
|
&project_id,
|
||||||
|
ep,
|
||||||
|
Arc::new(auth_info.allowed_ips),
|
||||||
|
);
|
||||||
// When we just got a secret, we don't need to invalidate it.
|
// When we just got a secret, we don't need to invalidate it.
|
||||||
Ok(auth_info.secret.map(Cached::new_uncached))
|
Ok(auth_info.secret.map(Cached::new_uncached))
|
||||||
}
|
}
|
||||||
@@ -213,16 +212,15 @@ impl super::Api for Api {
|
|||||||
let auth_info = self.do_get_auth_info(ctx, user_info).await?;
|
let auth_info = self.do_get_auth_info(ctx, user_info).await?;
|
||||||
let allowed_ips = Arc::new(auth_info.allowed_ips);
|
let allowed_ips = Arc::new(auth_info.allowed_ips);
|
||||||
let user = &user_info.user;
|
let user = &user_info.user;
|
||||||
if let Some(project_id) = auth_info.project_id {
|
let project_id = auth_info.project_id.unwrap_or(ep.clone());
|
||||||
if let Some(secret) = &auth_info.secret {
|
if let Some(secret) = &auth_info.secret {
|
||||||
self.caches
|
|
||||||
.project_info
|
|
||||||
.insert_role_secret(&project_id, ep, user, secret.clone())
|
|
||||||
}
|
|
||||||
self.caches
|
self.caches
|
||||||
.project_info
|
.project_info
|
||||||
.insert_allowed_ips(&project_id, ep, allowed_ips.clone());
|
.insert_role_secret(&project_id, ep, user, secret.clone())
|
||||||
}
|
}
|
||||||
|
self.caches
|
||||||
|
.project_info
|
||||||
|
.insert_allowed_ips(&project_id, ep, allowed_ips.clone());
|
||||||
Ok(Cached::new_uncached(allowed_ips))
|
Ok(Cached::new_uncached(allowed_ips))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -8,6 +8,8 @@ use futures::future::BoxFuture;
|
|||||||
use futures::stream::FuturesUnordered;
|
use futures::stream::FuturesUnordered;
|
||||||
use futures::{FutureExt, StreamExt};
|
use futures::{FutureExt, StreamExt};
|
||||||
use remote_storage::RemoteStorageConfig;
|
use remote_storage::RemoteStorageConfig;
|
||||||
|
use safekeeper::control_file::FileStorage;
|
||||||
|
use safekeeper::state::TimelinePersistentState;
|
||||||
use sd_notify::NotifyState;
|
use sd_notify::NotifyState;
|
||||||
use tokio::runtime::Handle;
|
use tokio::runtime::Handle;
|
||||||
use tokio::signal::unix::{signal, SignalKind};
|
use tokio::signal::unix::{signal, SignalKind};
|
||||||
@@ -30,12 +32,12 @@ use safekeeper::defaults::{
|
|||||||
DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG_BYTES,
|
DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG_BYTES,
|
||||||
DEFAULT_PG_LISTEN_ADDR,
|
DEFAULT_PG_LISTEN_ADDR,
|
||||||
};
|
};
|
||||||
use safekeeper::wal_service;
|
|
||||||
use safekeeper::GlobalTimelines;
|
use safekeeper::GlobalTimelines;
|
||||||
use safekeeper::SafeKeeperConf;
|
use safekeeper::SafeKeeperConf;
|
||||||
use safekeeper::{broker, WAL_SERVICE_RUNTIME};
|
use safekeeper::{broker, WAL_SERVICE_RUNTIME};
|
||||||
use safekeeper::{control_file, BROKER_RUNTIME};
|
use safekeeper::{control_file, BROKER_RUNTIME};
|
||||||
use safekeeper::{http, WAL_REMOVER_RUNTIME};
|
use safekeeper::{http, WAL_REMOVER_RUNTIME};
|
||||||
|
use safekeeper::{json_merge, wal_service};
|
||||||
use safekeeper::{remove_wal, WAL_BACKUP_RUNTIME};
|
use safekeeper::{remove_wal, WAL_BACKUP_RUNTIME};
|
||||||
use safekeeper::{wal_backup, HTTP_RUNTIME};
|
use safekeeper::{wal_backup, HTTP_RUNTIME};
|
||||||
use storage_broker::DEFAULT_ENDPOINT;
|
use storage_broker::DEFAULT_ENDPOINT;
|
||||||
@@ -105,9 +107,6 @@ struct Args {
|
|||||||
/// Do not wait for changes to be written safely to disk. Unsafe.
|
/// Do not wait for changes to be written safely to disk. Unsafe.
|
||||||
#[arg(short, long)]
|
#[arg(short, long)]
|
||||||
no_sync: bool,
|
no_sync: bool,
|
||||||
/// Dump control file at path specified by this argument and exit.
|
|
||||||
#[arg(long)]
|
|
||||||
dump_control_file: Option<Utf8PathBuf>,
|
|
||||||
/// Broker endpoint for storage nodes coordination in the form
|
/// Broker endpoint for storage nodes coordination in the form
|
||||||
/// http[s]://host:port. In case of https schema TLS is connection is
|
/// http[s]://host:port. In case of https schema TLS is connection is
|
||||||
/// established; plaintext otherwise.
|
/// established; plaintext otherwise.
|
||||||
@@ -166,6 +165,21 @@ struct Args {
|
|||||||
/// useful for debugging.
|
/// useful for debugging.
|
||||||
#[arg(long)]
|
#[arg(long)]
|
||||||
current_thread_runtime: bool,
|
current_thread_runtime: bool,
|
||||||
|
/// Dump control file at path specified by this argument and exit.
|
||||||
|
#[arg(long)]
|
||||||
|
dump_control_file: Option<Utf8PathBuf>,
|
||||||
|
/// Patch control file at path specified by this argument and exit.
|
||||||
|
/// Patch is specified in --patch option and imposed over
|
||||||
|
/// control file as per rfc7386.
|
||||||
|
/// Without --write-patched the result is only printed.
|
||||||
|
#[arg(long, verbatim_doc_comment)]
|
||||||
|
patch_control_file: Option<Utf8PathBuf>,
|
||||||
|
/// The patch to apply to control file at --patch-control-file, in JSON.
|
||||||
|
#[arg(long, default_value = None)]
|
||||||
|
patch: Option<String>,
|
||||||
|
/// Write --patch-control-file result back in place.
|
||||||
|
#[arg(long, default_value = "false")]
|
||||||
|
write_patched: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Like PathBufValueParser, but allows empty string.
|
// Like PathBufValueParser, but allows empty string.
|
||||||
@@ -207,7 +221,13 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
if let Some(addr) = args.dump_control_file {
|
if let Some(addr) = args.dump_control_file {
|
||||||
let state = control_file::FileStorage::load_control_file(addr)?;
|
let state = control_file::FileStorage::load_control_file(addr)?;
|
||||||
let json = serde_json::to_string(&state)?;
|
let json = serde_json::to_string(&state)?;
|
||||||
print!("{json}");
|
println!("{json}");
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(cfile_path) = args.patch_control_file {
|
||||||
|
let patch = args.patch.ok_or(anyhow::anyhow!("patch is missing"))?;
|
||||||
|
patch_control_file(cfile_path, patch, args.write_patched).await?;
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -529,6 +549,26 @@ fn parse_remote_storage(storage_conf: &str) -> anyhow::Result<RemoteStorageConfi
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn patch_control_file(
|
||||||
|
cfile_path: Utf8PathBuf,
|
||||||
|
patch: String,
|
||||||
|
write: bool,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
let state = control_file::FileStorage::load_control_file(&cfile_path)?;
|
||||||
|
// serialize to json, impose patch and deserialize back
|
||||||
|
let mut state_json =
|
||||||
|
serde_json::to_value(state).context("failed to serialize state to json")?;
|
||||||
|
let patch_json = serde_json::from_str(&patch).context("failed to parse patch")?;
|
||||||
|
json_merge(&mut state_json, patch_json);
|
||||||
|
let patched_state: TimelinePersistentState =
|
||||||
|
serde_json::from_value(state_json.clone()).context("failed to deserialize patched json")?;
|
||||||
|
println!("{state_json}");
|
||||||
|
if write {
|
||||||
|
FileStorage::do_persist(&patched_state, &cfile_path, true).await?;
|
||||||
|
}
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn verify_cli() {
|
fn verify_cli() {
|
||||||
use clap::CommandFactory;
|
use clap::CommandFactory;
|
||||||
|
|||||||
@@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
use anyhow::{bail, ensure, Context, Result};
|
use anyhow::{bail, ensure, Context, Result};
|
||||||
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
|
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
|
||||||
use camino::Utf8PathBuf;
|
use camino::{Utf8Path, Utf8PathBuf};
|
||||||
use tokio::fs::{self, File};
|
use tokio::fs::{self, File};
|
||||||
use tokio::io::AsyncWriteExt;
|
use tokio::io::AsyncWriteExt;
|
||||||
|
|
||||||
@@ -155,6 +155,46 @@ impl FileStorage {
|
|||||||
})?;
|
})?;
|
||||||
Ok(state)
|
Ok(state)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Persist state s to dst_path, optionally fsyncing file.
|
||||||
|
pub async fn do_persist(
|
||||||
|
s: &TimelinePersistentState,
|
||||||
|
dst_path: &Utf8Path,
|
||||||
|
sync: bool,
|
||||||
|
) -> Result<()> {
|
||||||
|
let mut f = File::create(&dst_path)
|
||||||
|
.await
|
||||||
|
.with_context(|| format!("failed to create partial control file at: {}", &dst_path))?;
|
||||||
|
let mut buf: Vec<u8> = Vec::new();
|
||||||
|
WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_MAGIC)?;
|
||||||
|
WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_FORMAT_VERSION)?;
|
||||||
|
s.ser_into(&mut buf)?;
|
||||||
|
|
||||||
|
// calculate checksum before resize
|
||||||
|
let checksum = crc32c::crc32c(&buf);
|
||||||
|
buf.extend_from_slice(&checksum.to_le_bytes());
|
||||||
|
|
||||||
|
f.write_all(&buf).await.with_context(|| {
|
||||||
|
format!(
|
||||||
|
"failed to write safekeeper state into control file at: {}",
|
||||||
|
dst_path
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
f.flush().await.with_context(|| {
|
||||||
|
format!(
|
||||||
|
"failed to flush safekeeper state into control file at: {}",
|
||||||
|
dst_path
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
|
||||||
|
// fsync the file
|
||||||
|
if sync {
|
||||||
|
f.sync_all()
|
||||||
|
.await
|
||||||
|
.with_context(|| format!("failed to sync partial control file at {}", dst_path))?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Deref for FileStorage {
|
impl Deref for FileStorage {
|
||||||
@@ -167,7 +207,7 @@ impl Deref for FileStorage {
|
|||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
impl Storage for FileStorage {
|
impl Storage for FileStorage {
|
||||||
/// Persists state durably to the underlying storage.
|
/// Atomically persists state durably to the underlying storage.
|
||||||
///
|
///
|
||||||
/// For a description, see <https://lwn.net/Articles/457667/>.
|
/// For a description, see <https://lwn.net/Articles/457667/>.
|
||||||
async fn persist(&mut self, s: &TimelinePersistentState) -> Result<()> {
|
async fn persist(&mut self, s: &TimelinePersistentState) -> Result<()> {
|
||||||
@@ -175,46 +215,9 @@ impl Storage for FileStorage {
|
|||||||
|
|
||||||
// write data to safekeeper.control.partial
|
// write data to safekeeper.control.partial
|
||||||
let control_partial_path = self.timeline_dir.join(CONTROL_FILE_NAME_PARTIAL);
|
let control_partial_path = self.timeline_dir.join(CONTROL_FILE_NAME_PARTIAL);
|
||||||
let mut control_partial = File::create(&control_partial_path).await.with_context(|| {
|
FileStorage::do_persist(s, &control_partial_path, !self.conf.no_sync).await?;
|
||||||
format!(
|
|
||||||
"failed to create partial control file at: {}",
|
|
||||||
&control_partial_path
|
|
||||||
)
|
|
||||||
})?;
|
|
||||||
let mut buf: Vec<u8> = Vec::new();
|
|
||||||
WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_MAGIC)?;
|
|
||||||
WriteBytesExt::write_u32::<LittleEndian>(&mut buf, SK_FORMAT_VERSION)?;
|
|
||||||
s.ser_into(&mut buf)?;
|
|
||||||
|
|
||||||
// calculate checksum before resize
|
|
||||||
let checksum = crc32c::crc32c(&buf);
|
|
||||||
buf.extend_from_slice(&checksum.to_le_bytes());
|
|
||||||
|
|
||||||
control_partial.write_all(&buf).await.with_context(|| {
|
|
||||||
format!(
|
|
||||||
"failed to write safekeeper state into control file at: {}",
|
|
||||||
control_partial_path
|
|
||||||
)
|
|
||||||
})?;
|
|
||||||
control_partial.flush().await.with_context(|| {
|
|
||||||
format!(
|
|
||||||
"failed to flush safekeeper state into control file at: {}",
|
|
||||||
control_partial_path
|
|
||||||
)
|
|
||||||
})?;
|
|
||||||
|
|
||||||
// fsync the file
|
|
||||||
if !self.conf.no_sync {
|
|
||||||
control_partial.sync_all().await.with_context(|| {
|
|
||||||
format!(
|
|
||||||
"failed to sync partial control file at {}",
|
|
||||||
control_partial_path
|
|
||||||
)
|
|
||||||
})?;
|
|
||||||
}
|
|
||||||
|
|
||||||
let control_path = self.timeline_dir.join(CONTROL_FILE_NAME);
|
let control_path = self.timeline_dir.join(CONTROL_FILE_NAME);
|
||||||
|
|
||||||
// rename should be atomic
|
// rename should be atomic
|
||||||
fs::rename(&control_partial_path, &control_path).await?;
|
fs::rename(&control_partial_path, &control_path).await?;
|
||||||
// this sync is not required by any standard but postgres does this (see durable_rename)
|
// this sync is not required by any standard but postgres does this (see durable_rename)
|
||||||
|
|||||||
@@ -2,6 +2,7 @@
|
|||||||
use camino::Utf8PathBuf;
|
use camino::Utf8PathBuf;
|
||||||
use once_cell::sync::Lazy;
|
use once_cell::sync::Lazy;
|
||||||
use remote_storage::RemoteStorageConfig;
|
use remote_storage::RemoteStorageConfig;
|
||||||
|
use serde_json::Value;
|
||||||
use tokio::runtime::Runtime;
|
use tokio::runtime::Runtime;
|
||||||
|
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
@@ -175,3 +176,24 @@ pub static METRICS_SHIFTER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
|||||||
.build()
|
.build()
|
||||||
.expect("Failed to create broker runtime")
|
.expect("Failed to create broker runtime")
|
||||||
});
|
});
|
||||||
|
|
||||||
|
/// Merge json b into json a according to
|
||||||
|
/// https://www.rfc-editor.org/rfc/rfc7396
|
||||||
|
/// https://stackoverflow.com/a/54118457/4014587
|
||||||
|
pub fn json_merge(a: &mut Value, b: Value) {
|
||||||
|
if let Value::Object(a) = a {
|
||||||
|
if let Value::Object(b) = b {
|
||||||
|
for (k, v) in b {
|
||||||
|
if v.is_null() {
|
||||||
|
a.remove(&k);
|
||||||
|
} else {
|
||||||
|
json_merge(a.entry(k).or_insert(Value::Null), v);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
*a = b;
|
||||||
|
}
|
||||||
|
|||||||
@@ -2998,23 +2998,6 @@ class Endpoint(PgProtocol):
|
|||||||
):
|
):
|
||||||
self.stop()
|
self.stop()
|
||||||
|
|
||||||
def log_contains(self, pattern: str) -> Optional[str]:
|
|
||||||
"""Check that the compute log contains a line that matches the given regex"""
|
|
||||||
logfile = self.endpoint_path() / "compute.log"
|
|
||||||
if not logfile.exists():
|
|
||||||
log.warning(f"Skipping log check: {logfile} does not exist")
|
|
||||||
return None
|
|
||||||
|
|
||||||
contains_re = re.compile(pattern)
|
|
||||||
|
|
||||||
with logfile.open("r") as f:
|
|
||||||
for line in f:
|
|
||||||
if contains_re.search(line):
|
|
||||||
# found it!
|
|
||||||
return line
|
|
||||||
|
|
||||||
return None
|
|
||||||
|
|
||||||
# Checkpoints running endpoint and returns pg_wal size in MB.
|
# Checkpoints running endpoint and returns pg_wal size in MB.
|
||||||
def get_pg_wal_size(self):
|
def get_pg_wal_size(self):
|
||||||
log.info(f'checkpointing at LSN {self.safe_psql("select pg_current_wal_lsn()")[0][0]}')
|
log.info(f'checkpointing at LSN {self.safe_psql("select pg_current_wal_lsn()")[0][0]}')
|
||||||
|
|||||||
@@ -1,20 +1,10 @@
|
|||||||
import json
|
|
||||||
import os
|
|
||||||
import time
|
import time
|
||||||
from pathlib import Path
|
|
||||||
|
|
||||||
import pytest
|
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||||
from fixtures.log_helper import log
|
|
||||||
from fixtures.neon_fixtures import NeonEnvBuilder, wait_for_wal_insert_lsn
|
|
||||||
from fixtures.pageserver.utils import (
|
|
||||||
wait_for_last_record_lsn,
|
|
||||||
wait_for_upload,
|
|
||||||
)
|
|
||||||
from fixtures.remote_storage import RemoteStorageKind
|
|
||||||
from fixtures.types import Lsn, TenantId, TimelineId
|
|
||||||
from fixtures.utils import query_scalar
|
|
||||||
|
|
||||||
|
|
||||||
|
# Test restarting page server, while safekeeper and compute node keep
|
||||||
|
# running.
|
||||||
def test_next_xid(neon_env_builder: NeonEnvBuilder):
|
def test_next_xid(neon_env_builder: NeonEnvBuilder):
|
||||||
env = neon_env_builder.init_start()
|
env = neon_env_builder.init_start()
|
||||||
|
|
||||||
@@ -62,370 +52,3 @@ def test_next_xid(neon_env_builder: NeonEnvBuilder):
|
|||||||
cur = conn.cursor()
|
cur = conn.cursor()
|
||||||
cur.execute("SELECT count(*) FROM t")
|
cur.execute("SELECT count(*) FROM t")
|
||||||
assert cur.fetchone() == (iterations,)
|
assert cur.fetchone() == (iterations,)
|
||||||
|
|
||||||
|
|
||||||
# Test for a bug we had, where nextXid was incorrectly updated when the
|
|
||||||
# XID counter reached 2 billion. The nextXid tracking logic incorrectly
|
|
||||||
# treated 0 (InvalidTransactionId) as a regular XID, and after reaching
|
|
||||||
# 2 billion, it started to look like a very new XID, which caused nextXid
|
|
||||||
# to be immediately advanced to the next epoch.
|
|
||||||
#
|
|
||||||
def test_import_at_2bil(
|
|
||||||
neon_env_builder: NeonEnvBuilder,
|
|
||||||
test_output_dir: Path,
|
|
||||||
pg_distrib_dir: Path,
|
|
||||||
pg_bin,
|
|
||||||
vanilla_pg,
|
|
||||||
):
|
|
||||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
|
||||||
env = neon_env_builder.init_start()
|
|
||||||
ps_http = env.pageserver.http_client()
|
|
||||||
|
|
||||||
# Set LD_LIBRARY_PATH in the env properly, otherwise we may use the wrong libpq.
|
|
||||||
# PgBin sets it automatically, but here we need to pipe psql output to the tar command.
|
|
||||||
psql_env = {"LD_LIBRARY_PATH": str(pg_distrib_dir / "lib")}
|
|
||||||
|
|
||||||
# Reset the vanilla Postgres instance to somewhat before 2 billion transactions.
|
|
||||||
pg_resetwal_path = os.path.join(pg_bin.pg_bin_path, "pg_resetwal")
|
|
||||||
cmd = [pg_resetwal_path, "--next-transaction-id=2129920000", "-D", str(vanilla_pg.pgdatadir)]
|
|
||||||
pg_bin.run_capture(cmd, env=psql_env)
|
|
||||||
|
|
||||||
vanilla_pg.start()
|
|
||||||
vanilla_pg.safe_psql("create user cloud_admin with password 'postgres' superuser")
|
|
||||||
vanilla_pg.safe_psql(
|
|
||||||
"""create table tt as select 'long string to consume some space' || g
|
|
||||||
from generate_series(1,300000) g"""
|
|
||||||
)
|
|
||||||
assert vanilla_pg.safe_psql("select count(*) from tt") == [(300000,)]
|
|
||||||
vanilla_pg.safe_psql("CREATE TABLE t (t text);")
|
|
||||||
vanilla_pg.safe_psql("INSERT INTO t VALUES ('inserted in vanilla')")
|
|
||||||
|
|
||||||
branch_name = "import_from_vanilla"
|
|
||||||
tenant = TenantId.generate()
|
|
||||||
timeline = TimelineId.generate()
|
|
||||||
|
|
||||||
env.pageserver.tenant_create(tenant)
|
|
||||||
|
|
||||||
# Take basebackup
|
|
||||||
basebackup_dir = os.path.join(test_output_dir, "basebackup")
|
|
||||||
base_tar = os.path.join(basebackup_dir, "base.tar")
|
|
||||||
wal_tar = os.path.join(basebackup_dir, "pg_wal.tar")
|
|
||||||
os.mkdir(basebackup_dir)
|
|
||||||
vanilla_pg.safe_psql("CHECKPOINT")
|
|
||||||
pg_bin.run(
|
|
||||||
[
|
|
||||||
"pg_basebackup",
|
|
||||||
"-F",
|
|
||||||
"tar",
|
|
||||||
"-d",
|
|
||||||
vanilla_pg.connstr(),
|
|
||||||
"-D",
|
|
||||||
basebackup_dir,
|
|
||||||
]
|
|
||||||
)
|
|
||||||
|
|
||||||
# Get start_lsn and end_lsn
|
|
||||||
with open(os.path.join(basebackup_dir, "backup_manifest")) as f:
|
|
||||||
manifest = json.load(f)
|
|
||||||
start_lsn = manifest["WAL-Ranges"][0]["Start-LSN"]
|
|
||||||
end_lsn = manifest["WAL-Ranges"][0]["End-LSN"]
|
|
||||||
|
|
||||||
def import_tar(base, wal):
|
|
||||||
env.neon_cli.raw_cli(
|
|
||||||
[
|
|
||||||
"timeline",
|
|
||||||
"import",
|
|
||||||
"--tenant-id",
|
|
||||||
str(tenant),
|
|
||||||
"--timeline-id",
|
|
||||||
str(timeline),
|
|
||||||
"--node-name",
|
|
||||||
branch_name,
|
|
||||||
"--base-lsn",
|
|
||||||
start_lsn,
|
|
||||||
"--base-tarfile",
|
|
||||||
base,
|
|
||||||
"--end-lsn",
|
|
||||||
end_lsn,
|
|
||||||
"--wal-tarfile",
|
|
||||||
wal,
|
|
||||||
"--pg-version",
|
|
||||||
env.pg_version,
|
|
||||||
]
|
|
||||||
)
|
|
||||||
|
|
||||||
# Importing correct backup works
|
|
||||||
import_tar(base_tar, wal_tar)
|
|
||||||
wait_for_last_record_lsn(ps_http, tenant, timeline, Lsn(end_lsn))
|
|
||||||
|
|
||||||
endpoint = env.endpoints.create_start(
|
|
||||||
branch_name,
|
|
||||||
endpoint_id="ep-import_from_vanilla",
|
|
||||||
tenant_id=tenant,
|
|
||||||
config_lines=[
|
|
||||||
"log_autovacuum_min_duration = 0",
|
|
||||||
"autovacuum_naptime='5 s'",
|
|
||||||
],
|
|
||||||
)
|
|
||||||
assert endpoint.safe_psql("select count(*) from t") == [(1,)]
|
|
||||||
|
|
||||||
conn = endpoint.connect()
|
|
||||||
cur = conn.cursor()
|
|
||||||
|
|
||||||
# Install extension containing function needed for test
|
|
||||||
cur.execute("CREATE EXTENSION neon_test_utils")
|
|
||||||
|
|
||||||
# Advance nextXid close to 2 billion XIDs
|
|
||||||
while True:
|
|
||||||
xid = int(query_scalar(cur, "SELECT txid_current()"))
|
|
||||||
log.info(f"xid now {xid}")
|
|
||||||
# Consume 10k transactons at a time until we get to 2^31 - 200k
|
|
||||||
if xid < 2 * 1024 * 1024 * 1024 - 100000:
|
|
||||||
cur.execute("select test_consume_xids(50000);")
|
|
||||||
elif xid < 2 * 1024 * 1024 * 1024 - 10000:
|
|
||||||
cur.execute("select test_consume_xids(5000);")
|
|
||||||
else:
|
|
||||||
break
|
|
||||||
|
|
||||||
# Run a bunch of real INSERTs to cross over the 2 billion mark
|
|
||||||
# Use a begin-exception block to have a separate sub-XID for each insert.
|
|
||||||
cur.execute(
|
|
||||||
"""
|
|
||||||
do $$
|
|
||||||
begin
|
|
||||||
for i in 1..10000 loop
|
|
||||||
-- Use a begin-exception block to generate a new subtransaction on each iteration
|
|
||||||
begin
|
|
||||||
insert into t values (i);
|
|
||||||
exception when others then
|
|
||||||
raise 'not expected %', sqlerrm;
|
|
||||||
end;
|
|
||||||
end loop;
|
|
||||||
end;
|
|
||||||
$$;
|
|
||||||
"""
|
|
||||||
)
|
|
||||||
# A checkpoint writes a WAL record with xl_xid=0. Many other WAL
|
|
||||||
# records would have the same effect.
|
|
||||||
cur.execute("checkpoint")
|
|
||||||
|
|
||||||
# wait until pageserver receives that data
|
|
||||||
wait_for_wal_insert_lsn(env, endpoint, tenant, timeline)
|
|
||||||
|
|
||||||
# Restart endpoint
|
|
||||||
endpoint.stop()
|
|
||||||
endpoint.start()
|
|
||||||
|
|
||||||
conn = endpoint.connect()
|
|
||||||
cur = conn.cursor()
|
|
||||||
cur.execute("SELECT count(*) from t")
|
|
||||||
assert cur.fetchone() == (10000 + 1,)
|
|
||||||
|
|
||||||
|
|
||||||
# This is a followup to the test_import_at_2bil test.
|
|
||||||
#
|
|
||||||
# Use a failpoint to reintroduce the bug that test_import_at_2bil also
|
|
||||||
# tests. Then, after the damage has been done, clear the failpoint to
|
|
||||||
# fix the bug. Check that the one-off hack that we added for a particular
|
|
||||||
# timeline that hit this in production fixes the broken timeline.
|
|
||||||
def test_one_off_hack_for_nextxid_bug(
|
|
||||||
neon_env_builder: NeonEnvBuilder,
|
|
||||||
test_output_dir: Path,
|
|
||||||
pg_distrib_dir: Path,
|
|
||||||
pg_bin,
|
|
||||||
vanilla_pg,
|
|
||||||
):
|
|
||||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
|
||||||
env = neon_env_builder.init_start()
|
|
||||||
ps_http = env.pageserver.http_client()
|
|
||||||
|
|
||||||
env.pageserver.allowed_errors.append(".*nextXid fixed by one-off hack.*")
|
|
||||||
|
|
||||||
# We begin with the old bug still present, to create a broken timeline
|
|
||||||
ps_http.configure_failpoints(("reintroduce-nextxid-update-bug", "return(true)"))
|
|
||||||
|
|
||||||
# Set LD_LIBRARY_PATH in the env properly, otherwise we may use the wrong libpq.
|
|
||||||
# PgBin sets it automatically, but here we need to pipe psql output to the tar command.
|
|
||||||
psql_env = {"LD_LIBRARY_PATH": str(pg_distrib_dir / "lib")}
|
|
||||||
|
|
||||||
# Reset the vanilla Postgres instance to somewhat before 2 billion transactions,
|
|
||||||
# and around the same LSN as with the production timeline.
|
|
||||||
pg_resetwal_path = os.path.join(pg_bin.pg_bin_path, "pg_resetwal")
|
|
||||||
cmd = [
|
|
||||||
pg_resetwal_path,
|
|
||||||
"--next-transaction-id=2129920000",
|
|
||||||
"-l",
|
|
||||||
"000000010000035A000000E0",
|
|
||||||
"-D",
|
|
||||||
str(vanilla_pg.pgdatadir),
|
|
||||||
]
|
|
||||||
pg_bin.run_capture(cmd, env=psql_env)
|
|
||||||
|
|
||||||
vanilla_pg.start()
|
|
||||||
vanilla_pg.safe_psql("create user cloud_admin with password 'postgres' superuser")
|
|
||||||
vanilla_pg.safe_psql(
|
|
||||||
"""create table tt as select 'long string to consume some space' || g
|
|
||||||
from generate_series(1,300000) g"""
|
|
||||||
)
|
|
||||||
assert vanilla_pg.safe_psql("select count(*) from tt") == [(300000,)]
|
|
||||||
vanilla_pg.safe_psql("CREATE TABLE t (t text);")
|
|
||||||
vanilla_pg.safe_psql("INSERT INTO t VALUES ('inserted in vanilla')")
|
|
||||||
|
|
||||||
branch_name = "import_from_vanilla"
|
|
||||||
# This is the tenant/timeline that the one-off hack targets
|
|
||||||
tenant = "df254570a4f603805528b46b0d45a76c"
|
|
||||||
timeline = TimelineId.generate()
|
|
||||||
|
|
||||||
env.pageserver.tenant_create(tenant)
|
|
||||||
|
|
||||||
# Take basebackup
|
|
||||||
basebackup_dir = os.path.join(test_output_dir, "basebackup")
|
|
||||||
base_tar = os.path.join(basebackup_dir, "base.tar")
|
|
||||||
wal_tar = os.path.join(basebackup_dir, "pg_wal.tar")
|
|
||||||
os.mkdir(basebackup_dir)
|
|
||||||
vanilla_pg.safe_psql("CHECKPOINT")
|
|
||||||
pg_bin.run(
|
|
||||||
[
|
|
||||||
"pg_basebackup",
|
|
||||||
"-F",
|
|
||||||
"tar",
|
|
||||||
"-d",
|
|
||||||
vanilla_pg.connstr(),
|
|
||||||
"-D",
|
|
||||||
basebackup_dir,
|
|
||||||
]
|
|
||||||
)
|
|
||||||
|
|
||||||
# Get start_lsn and end_lsn
|
|
||||||
with open(os.path.join(basebackup_dir, "backup_manifest")) as f:
|
|
||||||
manifest = json.load(f)
|
|
||||||
start_lsn = manifest["WAL-Ranges"][0]["Start-LSN"]
|
|
||||||
end_lsn = manifest["WAL-Ranges"][0]["End-LSN"]
|
|
||||||
|
|
||||||
def import_tar(base, wal):
|
|
||||||
env.neon_cli.raw_cli(
|
|
||||||
[
|
|
||||||
"timeline",
|
|
||||||
"import",
|
|
||||||
"--tenant-id",
|
|
||||||
str(tenant),
|
|
||||||
"--timeline-id",
|
|
||||||
str(timeline),
|
|
||||||
"--node-name",
|
|
||||||
branch_name,
|
|
||||||
"--base-lsn",
|
|
||||||
start_lsn,
|
|
||||||
"--base-tarfile",
|
|
||||||
base,
|
|
||||||
"--end-lsn",
|
|
||||||
end_lsn,
|
|
||||||
"--wal-tarfile",
|
|
||||||
wal,
|
|
||||||
"--pg-version",
|
|
||||||
env.pg_version,
|
|
||||||
]
|
|
||||||
)
|
|
||||||
|
|
||||||
# Importing correct backup works
|
|
||||||
import_tar(base_tar, wal_tar)
|
|
||||||
wait_for_last_record_lsn(ps_http, tenant, timeline, Lsn(end_lsn))
|
|
||||||
|
|
||||||
endpoint = env.endpoints.create_start(
|
|
||||||
branch_name,
|
|
||||||
endpoint_id="ep-import_from_vanilla",
|
|
||||||
tenant_id=tenant,
|
|
||||||
config_lines=[
|
|
||||||
"log_autovacuum_min_duration = 0",
|
|
||||||
"autovacuum_naptime='5 s'",
|
|
||||||
],
|
|
||||||
)
|
|
||||||
assert endpoint.safe_psql("select count(*) from t") == [(1,)]
|
|
||||||
|
|
||||||
conn = endpoint.connect()
|
|
||||||
cur = conn.cursor()
|
|
||||||
|
|
||||||
# Install extension containing function needed for test
|
|
||||||
cur.execute("CREATE EXTENSION neon_test_utils")
|
|
||||||
|
|
||||||
# Advance nextXid to the target XID, which is somewhat above the 2
|
|
||||||
# billion mark.
|
|
||||||
while True:
|
|
||||||
xid = int(query_scalar(cur, "SELECT txid_current()"))
|
|
||||||
log.info(f"xid now {xid}")
|
|
||||||
# Consume 10k transactons at a time until we get to 2^31 - 200k
|
|
||||||
if xid < (2325447052 - 100000):
|
|
||||||
cur.execute("select test_consume_xids(50000);")
|
|
||||||
elif xid < 2325447052 - 10000:
|
|
||||||
cur.execute("select test_consume_xids(5000);")
|
|
||||||
else:
|
|
||||||
break
|
|
||||||
|
|
||||||
# Run a bunch of real INSERTs to cross over the 2 billion mark
|
|
||||||
# Use a begin-exception block to have a separate sub-XID for each insert.
|
|
||||||
cur.execute(
|
|
||||||
"""
|
|
||||||
do $$
|
|
||||||
begin
|
|
||||||
for i in 1..10000 loop
|
|
||||||
-- Use a begin-exception block to generate a new subtransaction on each iteration
|
|
||||||
begin
|
|
||||||
insert into t values (i);
|
|
||||||
exception when others then
|
|
||||||
raise 'not expected %', sqlerrm;
|
|
||||||
end;
|
|
||||||
end loop;
|
|
||||||
end;
|
|
||||||
$$;
|
|
||||||
"""
|
|
||||||
)
|
|
||||||
# A checkpoint writes a WAL record with xl_xid=0. Many other WAL
|
|
||||||
# records would have the same effect.
|
|
||||||
cur.execute("checkpoint")
|
|
||||||
|
|
||||||
# Ok, the nextXid in the pageserver at this LSN should now be incorrectly
|
|
||||||
# set to 1:1024. Remember this LSN.
|
|
||||||
broken_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_insert_lsn()"))
|
|
||||||
|
|
||||||
# Ensure that the broken checkpoint data has reached permanent storage
|
|
||||||
ps_http.timeline_checkpoint(tenant, timeline)
|
|
||||||
wait_for_upload(ps_http, tenant, timeline, broken_lsn)
|
|
||||||
|
|
||||||
# Now fix the bug, and generate some WAL with XIDs
|
|
||||||
ps_http.configure_failpoints(("reintroduce-nextxid-update-bug", "off"))
|
|
||||||
cur.execute("INSERT INTO t VALUES ('after fix')")
|
|
||||||
fixed_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_insert_lsn()"))
|
|
||||||
|
|
||||||
log.info(f"nextXid was broken by {broken_lsn}, and fixed again by {fixed_lsn}")
|
|
||||||
|
|
||||||
# Stop the original endpoint, we don't need it anymore.
|
|
||||||
endpoint.stop()
|
|
||||||
|
|
||||||
# Test that we cannot start a new endpoint at the broken LSN.
|
|
||||||
env.neon_cli.create_branch(
|
|
||||||
"at-broken-lsn", branch_name, ancestor_start_lsn=broken_lsn, tenant_id=tenant
|
|
||||||
)
|
|
||||||
endpoint_broken = env.endpoints.create(
|
|
||||||
"at-broken-lsn",
|
|
||||||
endpoint_id="ep-at-broken-lsn",
|
|
||||||
tenant_id=tenant,
|
|
||||||
)
|
|
||||||
with pytest.raises(RuntimeError, match="Postgres exited unexpectedly with code 1"):
|
|
||||||
endpoint_broken.start()
|
|
||||||
assert endpoint_broken.log_contains(
|
|
||||||
'Could not open file "pg_xact/0000": No such file or directory'
|
|
||||||
)
|
|
||||||
|
|
||||||
# But after the bug was fixed, the one-off hack fixed the timeline,
|
|
||||||
# and a later LSN works.
|
|
||||||
env.neon_cli.create_branch(
|
|
||||||
"at-fixed-lsn", branch_name, ancestor_start_lsn=fixed_lsn, tenant_id=tenant
|
|
||||||
)
|
|
||||||
endpoint_fixed = env.endpoints.create_start(
|
|
||||||
"at-fixed-lsn", endpoint_id="ep-at-fixed-lsn", tenant_id=tenant
|
|
||||||
)
|
|
||||||
|
|
||||||
conn = endpoint_fixed.connect()
|
|
||||||
cur = conn.cursor()
|
|
||||||
cur.execute("SELECT count(*) from t")
|
|
||||||
# One "inserted in vanilla" row, 10000 in the DO-loop, and one "after fix" row
|
|
||||||
assert cur.fetchone() == (1 + 10000 + 1,)
|
|
||||||
|
|||||||
@@ -1,4 +1,3 @@
|
|||||||
import pytest
|
|
||||||
from fixtures.log_helper import log
|
from fixtures.log_helper import log
|
||||||
from fixtures.neon_fixtures import NeonEnv, fork_at_current_lsn
|
from fixtures.neon_fixtures import NeonEnv, fork_at_current_lsn
|
||||||
|
|
||||||
@@ -118,8 +117,6 @@ def test_vm_bit_clear(neon_simple_env: NeonEnv):
|
|||||||
# Test that the ALL_FROZEN VM bit is cleared correctly at a HEAP_LOCK
|
# Test that the ALL_FROZEN VM bit is cleared correctly at a HEAP_LOCK
|
||||||
# record.
|
# record.
|
||||||
#
|
#
|
||||||
# FIXME: This test is broken
|
|
||||||
@pytest.mark.skip("See https://github.com/neondatabase/neon/pull/6412#issuecomment-1902072541")
|
|
||||||
def test_vm_bit_clear_on_heap_lock(neon_simple_env: NeonEnv):
|
def test_vm_bit_clear_on_heap_lock(neon_simple_env: NeonEnv):
|
||||||
env = neon_simple_env
|
env = neon_simple_env
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user