[BRC-2905] Feed back PS-detected data corruption signals to SK and PG… (#12748)

… walproposer (#895)

Data corruptions are typically detected on the pageserver side when it
replays WAL records. However, since PS doesn't synchronously replay WAL
records as they are being ingested through safekeepers, we need some
extra plumbing to feed information about pageserver-detected corruptions
during compaction (and/or WAL redo in general) back to SK and PG for
proper action.

We don't yet know what actions PG/SK should take upon receiving the
signal, but we should have the detection and feedback in place.

Add an extra `corruption_detected` field to the `PageserverFeedback`
message that is sent from PS -> SK -> PG. It's a boolean value that is
set to true when PS detects a "critical error" that signals data
corruption, and it's sent in all `PageserverFeedback` messages. Upon
receiving this signal, the safekeeper raises a
`safekeeper_ps_corruption_detected` gauge metric (value set to 1). The
safekeeper then forwards this signal to PG where a
`ps_corruption_detected` gauge metric (value also set to 1) is raised in
the `neon_perf_counters` view.

Added an integration test in
`test_compaction.py::test_ps_corruption_detection_feedback` that
confirms that the safekeeper and PG can receive the data corruption
signal in the `PageserverFeedback` message in a simulated data
corruption.

## Problem

## Summary of changes

---------

Co-authored-by: William Huang <william.huang@databricks.com>
This commit is contained in:
Suhas Thalanki
2025-07-29 16:40:07 -04:00
committed by GitHub
parent 7cd0066212
commit 07c3cfd2a0
18 changed files with 208 additions and 1 deletions

View File

@@ -34,13 +34,16 @@ macro_rules! critical {
#[macro_export]
macro_rules! critical_timeline {
($tenant_shard_id:expr, $timeline_id:expr, $($arg:tt)*) => {{
($tenant_shard_id:expr, $timeline_id:expr, $corruption_detected:expr, $($arg:tt)*) => {{
if cfg!(debug_assertions) {
panic!($($arg)*);
}
// Increment both metrics
$crate::logging::TRACING_EVENT_COUNT_METRIC.inc_critical();
$crate::logging::HADRON_CRITICAL_STORAGE_EVENT_COUNT_METRIC.inc(&$tenant_shard_id.to_string(), &$timeline_id.to_string());
if let Some(c) = $corruption_detected.as_ref() {
c.store(true, std::sync::atomic::Ordering::Relaxed);
}
let backtrace = std::backtrace::Backtrace::capture();
tracing::error!("CRITICAL: [tenant_shard_id: {}, timeline_id: {}] {}\n{backtrace}",
$tenant_shard_id, $timeline_id, format!($($arg)*));

View File

@@ -32,6 +32,9 @@ pub struct PageserverFeedback {
pub replytime: SystemTime,
/// Used to track feedbacks from different shards. Always zero for unsharded tenants.
pub shard_number: u32,
/// If true, the pageserver has detected corruption and the safekeeper and postgres
/// should stop sending WAL.
pub corruption_detected: bool,
}
impl PageserverFeedback {
@@ -43,6 +46,7 @@ impl PageserverFeedback {
disk_consistent_lsn: Lsn::INVALID,
replytime: *PG_EPOCH,
shard_number: 0,
corruption_detected: false,
}
}
@@ -101,6 +105,13 @@ impl PageserverFeedback {
buf.put_u32(self.shard_number);
}
if self.corruption_detected {
nkeys += 1;
buf.put_slice(b"corruption_detected\0");
buf.put_i32(1);
buf.put_u8(1);
}
buf[buf_ptr] = nkeys;
}
@@ -147,6 +158,11 @@ impl PageserverFeedback {
assert_eq!(len, 4);
rf.shard_number = buf.get_u32();
}
b"corruption_detected" => {
let len = buf.get_i32();
assert_eq!(len, 1);
rf.corruption_detected = buf.get_u8() != 0;
}
_ => {
let len = buf.get_i32();
warn!(
@@ -206,6 +222,26 @@ mod tests {
assert_eq!(rf, rf_parsed);
}
// Test that databricks-specific fields added to the PageserverFeedback message are serialized
// and deserialized correctly, in addition to the existing fields from upstream.
#[test]
fn test_replication_feedback_databricks_fields() {
let mut rf = PageserverFeedback::empty();
rf.current_timeline_size = 12345678;
rf.last_received_lsn = Lsn(23456789);
rf.disk_consistent_lsn = Lsn(34567890);
rf.remote_consistent_lsn = Lsn(45678901);
rf.replytime = *PG_EPOCH + Duration::from_secs(100_000_000);
rf.shard_number = 1;
rf.corruption_detected = true;
let mut data = BytesMut::new();
rf.serialize(&mut data);
let rf_parsed = PageserverFeedback::parse(data.freeze());
assert_eq!(rf, rf_parsed);
}
#[test]
fn test_replication_feedback_unknown_key() {
let mut rf = PageserverFeedback::empty();

View File

@@ -426,6 +426,7 @@ pub fn empty_shmem() -> crate::bindings::WalproposerShmemState {
remote_consistent_lsn: 0,
replytime: 0,
shard_number: 0,
corruption_detected: false,
};
let empty_wal_rate_limiter = crate::bindings::WalRateLimiter {

View File

@@ -397,6 +397,11 @@ pub struct Timeline {
/// If true, the last compaction failed.
compaction_failed: AtomicBool,
/// Begin Hadron: If true, the pageserver has likely detected data corruption in the timeline.
/// We need to feed this information back to the Safekeeper and postgres for them to take the
/// appropriate action.
corruption_detected: AtomicBool,
/// Notifies the tenant compaction loop that there is pending L0 compaction work.
l0_compaction_trigger: Arc<Notify>,
@@ -3310,6 +3315,7 @@ impl Timeline {
compaction_lock: tokio::sync::Mutex::default(),
compaction_failed: AtomicBool::default(),
corruption_detected: AtomicBool::default(),
l0_compaction_trigger: resources.l0_compaction_trigger,
gc_lock: tokio::sync::Mutex::default(),
@@ -6004,6 +6010,17 @@ impl Timeline {
)))
});
// Begin Hadron
//
fail_point!("create-image-layer-fail-simulated-corruption", |_| {
self.corruption_detected
.store(true, std::sync::atomic::Ordering::Relaxed);
Err(CreateImageLayersError::Other(anyhow::anyhow!(
"failpoint create-image-layer-fail-simulated-corruption"
)))
});
// End Hadron
let io_concurrency = IoConcurrency::spawn_from_conf(
self.conf.get_vectored_concurrent_io,
self.gate
@@ -7149,6 +7166,7 @@ impl Timeline {
critical_timeline!(
self.tenant_shard_id,
self.timeline_id,
Some(&self.corruption_detected),
"walredo failure during page reconstruction: {err:?}"
);
}

View File

@@ -1397,6 +1397,7 @@ impl Timeline {
critical_timeline!(
self.tenant_shard_id,
self.timeline_id,
Some(&self.corruption_detected),
"missing key during compaction: {err:?}"
);
}
@@ -1441,6 +1442,7 @@ impl Timeline {
critical_timeline!(
self.tenant_shard_id,
self.timeline_id,
Some(&self.corruption_detected),
"could not compact, repartitioning keyspace failed: {e:?}"
);
}

View File

@@ -365,6 +365,7 @@ pub(super) async fn handle_walreceiver_connection(
critical_timeline!(
timeline.tenant_shard_id,
timeline.timeline_id,
Some(&timeline.corruption_detected),
"{msg}"
);
return Err(WalReceiverError::Other(anyhow!(msg)));
@@ -382,6 +383,7 @@ pub(super) async fn handle_walreceiver_connection(
critical_timeline!(
timeline.tenant_shard_id,
timeline.timeline_id,
Some(&timeline.corruption_detected),
"{msg}"
);
return Err(WalReceiverError::Other(anyhow!(msg)));
@@ -455,6 +457,7 @@ pub(super) async fn handle_walreceiver_connection(
critical_timeline!(
timeline.tenant_shard_id,
timeline.timeline_id,
Some(&timeline.corruption_detected),
"{err:?}"
);
}
@@ -586,6 +589,9 @@ pub(super) async fn handle_walreceiver_connection(
remote_consistent_lsn,
replytime: ts,
shard_number: timeline.tenant_shard_id.shard_number.0 as u32,
corruption_detected: timeline
.corruption_detected
.load(std::sync::atomic::Ordering::Relaxed),
};
debug!("neon_status_update {status_update:?}");

View File

@@ -23,6 +23,7 @@
use std::backtrace::Backtrace;
use std::collections::HashMap;
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, OnceLock};
use std::time::{Duration, Instant, SystemTime};
@@ -422,6 +423,8 @@ impl WalIngest {
critical_timeline!(
modification.tline.tenant_shard_id,
modification.tline.timeline_id,
// Hadron: No need to raise the corruption flag here; the caller of `ingest_record()` will do it.
None::<&AtomicBool>,
"clear_vm_bits for unknown VM relation {vm_rel}"
);
return Ok(());
@@ -431,6 +434,8 @@ impl WalIngest {
critical_timeline!(
modification.tline.tenant_shard_id,
modification.tline.timeline_id,
// Hadron: No need to raise the corruption flag here; the caller of `ingest_record()` will do it.
None::<&AtomicBool>,
"new_vm_blk {blknum} not in {vm_rel} of size {vm_size}"
);
new_vm_blk = None;
@@ -441,6 +446,8 @@ impl WalIngest {
critical_timeline!(
modification.tline.tenant_shard_id,
modification.tline.timeline_id,
// Hadron: No need to raise the corruption flag here; the caller of `ingest_record()` will do it.
None::<&AtomicBool>,
"old_vm_blk {blknum} not in {vm_rel} of size {vm_size}"
);
old_vm_blk = None;

View File

@@ -45,6 +45,7 @@ DatabricksMetricsShmemInit(void)
pg_atomic_init_u32(&databricks_metrics_shared->index_corruption_count, 0);
pg_atomic_init_u32(&databricks_metrics_shared->data_corruption_count, 0);
pg_atomic_init_u32(&databricks_metrics_shared->internal_error_count, 0);
pg_atomic_init_u32(&databricks_metrics_shared->ps_corruption_detected, 0);
}
}
/* END_HADRON */
@@ -440,6 +441,7 @@ neon_get_perf_counters(PG_FUNCTION_ARGS)
{"sql_index_corruption_count", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->index_corruption_count)},
{"sql_data_corruption_count", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->data_corruption_count)},
{"sql_internal_error_count", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->internal_error_count)},
{"ps_corruption_detected", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->ps_corruption_detected)},
{NULL, false, 0, 0},
};
for (int i = 0; databricks_metrics[i].name != NULL; i++)

View File

@@ -183,6 +183,7 @@ typedef struct
pg_atomic_uint32 index_corruption_count;
pg_atomic_uint32 data_corruption_count;
pg_atomic_uint32 internal_error_count;
pg_atomic_uint32 ps_corruption_detected;
} databricks_metrics;
extern databricks_metrics *databricks_metrics_shared;

View File

@@ -1887,6 +1887,12 @@ ParsePageserverFeedbackMessage(WalProposer *wp, StringInfo reply_message, Pagese
ps_feedback->shard_number = pq_getmsgint(reply_message, sizeof(uint32));
psfeedback_log("%u", key, ps_feedback->shard_number);
}
else if (strcmp(key, "corruption_detected") == 0)
{
Assert(value_len == 1);
ps_feedback->corruption_detected = pq_getmsgbyte(reply_message) != 0;
psfeedback_log("%s", key, ps_feedback->corruption_detected ? "true" : "false");
}
else
{
/*

View File

@@ -374,6 +374,8 @@ typedef struct PageserverFeedback
XLogRecPtr remote_consistent_lsn;
TimestampTz replytime;
uint32 shard_number;
/* true if the pageserver has detected data corruption in the timeline */
bool corruption_detected;
} PageserverFeedback;
/* BEGIN_HADRON */

View File

@@ -49,6 +49,7 @@
#include "libpqwalproposer.h"
#include "neon.h"
#include "neon_perf_counters.h"
#include "neon_walreader.h"
#include "walproposer.h"
@@ -741,6 +742,11 @@ record_pageserver_feedback(PageserverFeedback *ps_feedback, shardno_t num_shards
Assert(ps_feedback->shard_number < MAX_SHARDS);
Assert(ps_feedback->shard_number < num_shards);
// Begin Hadron: Record any corruption signal from the pageserver first.
if (ps_feedback->corruption_detected) {
pg_atomic_write_u32(&databricks_metrics_shared->ps_corruption_detected, 1);
}
SpinLockAcquire(&walprop_shared->mutex);
// Hadron: Update the num_shards from the source-of-truth (shard map) lazily when we receive

View File

@@ -387,6 +387,7 @@ pub fn get_filesystem_usage(path: &std::path::Path) -> u64 {
critical_timeline!(
placeholder_ttid.tenant_id,
placeholder_ttid.timeline_id,
None::<&AtomicBool>,
"Global disk usage watcher failed to read filesystem usage: {:?}",
e
);

View File

@@ -518,6 +518,7 @@ pub async fn time_io_closure<E: Into<anyhow::Error>>(
pub struct FullTimelineInfo {
pub ttid: TenantTimelineId,
pub ps_feedback_count: u64,
pub ps_corruption_detected: bool,
pub last_ps_feedback: PageserverFeedback,
pub wal_backup_active: bool,
pub timeline_is_active: bool,
@@ -547,6 +548,7 @@ pub struct TimelineCollector {
ps_last_received_lsn: GenericGaugeVec<AtomicU64>,
feedback_last_time_seconds: GenericGaugeVec<AtomicU64>,
ps_feedback_count: GenericGaugeVec<AtomicU64>,
ps_corruption_detected: IntGaugeVec,
timeline_active: GenericGaugeVec<AtomicU64>,
wal_backup_active: GenericGaugeVec<AtomicU64>,
connected_computes: IntGaugeVec,
@@ -654,6 +656,15 @@ impl TimelineCollector {
)
.unwrap();
let ps_corruption_detected = IntGaugeVec::new(
Opts::new(
"safekeeper_ps_corruption_detected",
"1 if corruption was detected in the timeline according to feedback from the pageserver, 0 otherwise",
),
&["tenant_id", "timeline_id"],
)
.unwrap();
let timeline_active = GenericGaugeVec::new(
Opts::new(
"safekeeper_timeline_active",
@@ -774,6 +785,7 @@ impl TimelineCollector {
ps_last_received_lsn,
feedback_last_time_seconds,
ps_feedback_count,
ps_corruption_detected,
timeline_active,
wal_backup_active,
connected_computes,
@@ -892,6 +904,9 @@ impl Collector for TimelineCollector {
self.ps_feedback_count
.with_label_values(labels)
.set(tli.ps_feedback_count);
self.ps_corruption_detected
.with_label_values(labels)
.set(tli.ps_corruption_detected as i64);
if let Ok(unix_time) = tli
.last_ps_feedback
.replytime
@@ -925,6 +940,7 @@ impl Collector for TimelineCollector {
mfs.extend(self.ps_last_received_lsn.collect());
mfs.extend(self.feedback_last_time_seconds.collect());
mfs.extend(self.ps_feedback_count.collect());
mfs.extend(self.ps_corruption_detected.collect());
mfs.extend(self.timeline_active.collect());
mfs.extend(self.wal_backup_active.collect());
mfs.extend(self.connected_computes.collect());

View File

@@ -1,6 +1,7 @@
use std::collections::HashMap;
use std::fmt::Display;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::time::Duration;
use anyhow::{Context, anyhow};
@@ -305,6 +306,9 @@ impl InterpretedWalReader {
critical_timeline!(
ttid.tenant_id,
ttid.timeline_id,
// Hadron: The corruption flag is only used in PS so that it can feed this information back to SKs.
// We do not use these flags in SKs.
None::<&AtomicBool>,
"failed to read WAL record: {err:?}"
);
}
@@ -375,6 +379,9 @@ impl InterpretedWalReader {
critical_timeline!(
ttid.tenant_id,
ttid.timeline_id,
// Hadron: The corruption flag is only used in PS so that it can feed this information back to SKs.
// We do not use these flags in SKs.
None::<&AtomicBool>,
"failed to decode WAL record: {err:?}"
);
}

View File

@@ -55,6 +55,7 @@ pub struct WalSenders {
pub struct WalSendersTimelineMetricValues {
pub ps_feedback_counter: u64,
pub ps_corruption_detected: bool,
pub last_ps_feedback: PageserverFeedback,
pub interpreted_wal_reader_tasks: usize,
}
@@ -193,6 +194,7 @@ impl WalSenders {
WalSendersTimelineMetricValues {
ps_feedback_counter: shared.ps_feedback_counter,
ps_corruption_detected: shared.ps_corruption_detected,
last_ps_feedback: shared.last_ps_feedback,
interpreted_wal_reader_tasks,
}
@@ -209,6 +211,9 @@ impl WalSenders {
*shared.get_slot_mut(id).get_mut_feedback() = ReplicationFeedback::Pageserver(*feedback);
shared.last_ps_feedback = *feedback;
shared.ps_feedback_counter += 1;
if feedback.corruption_detected {
shared.ps_corruption_detected = true;
}
drop(shared);
RECEIVED_PS_FEEDBACKS.inc();
@@ -278,6 +283,9 @@ struct WalSendersShared {
last_ps_feedback: PageserverFeedback,
// total counter of pageserver feedbacks received
ps_feedback_counter: u64,
// Hadron: true iff we received a pageserver feedback that incidated
// data corruption in the timeline
ps_corruption_detected: bool,
slots: Vec<Option<WalSenderState>>,
}
@@ -328,6 +336,7 @@ impl WalSendersShared {
agg_standby_feedback: StandbyFeedback::empty(),
last_ps_feedback: PageserverFeedback::empty(),
ps_feedback_counter: 0,
ps_corruption_detected: false,
slots: Vec::new(),
}
}

View File

@@ -839,6 +839,7 @@ impl Timeline {
let WalSendersTimelineMetricValues {
ps_feedback_counter,
ps_corruption_detected,
last_ps_feedback,
interpreted_wal_reader_tasks,
} = self.walsenders.info_for_metrics();
@@ -847,6 +848,7 @@ impl Timeline {
Some(FullTimelineInfo {
ttid: self.ttid,
ps_feedback_count: ps_feedback_counter,
ps_corruption_detected,
last_ps_feedback,
wal_backup_active: self.wal_backup_active.load(Ordering::Relaxed),
timeline_is_active: self.broker_active.load(Ordering::Relaxed),

View File

@@ -863,6 +863,88 @@ def test_pageserver_compaction_circuit_breaker(neon_env_builder: NeonEnvBuilder)
assert not env.pageserver.log_contains(".*Circuit breaker failure ended.*")
@pytest.mark.skip(reason="Lakebase mode")
def test_ps_corruption_detection_feedback(neon_env_builder: NeonEnvBuilder):
"""
Test that when the pageserver detects corruption during image layer creation,
it sends corruption feedback to the safekeeper which gets recorded in its
safekeeper_ps_corruption_detected metric.
"""
# Configure tenant with aggressive compaction settings to easily trigger compaction
TENANT_CONF = {
# Small checkpoint distance to create many layers
"checkpoint_distance": 1024 * 128,
# Compact small layers
"compaction_target_size": 1024 * 128,
# Create image layers eagerly
"image_creation_threshold": 1,
"image_layer_creation_check_threshold": 0,
# Force frequent compaction
"compaction_period": "1s",
}
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
# We are simulating compaction failures so we should allow these error messages.
env.pageserver.allowed_errors.append(".*Compaction failed.*")
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
pageserver_http = env.pageserver.http_client()
workload = Workload(env, tenant_id, timeline_id)
workload.init()
# Enable the failpoint that will cause image layer creation to fail due to a (simulated) detected
# corruption.
pageserver_http.configure_failpoints(("create-image-layer-fail-simulated-corruption", "return"))
# Write some data to trigger compaction and image layer creation
log.info("Writing data to trigger compaction...")
workload.write_rows(1024 * 64, upload=False)
workload.write_rows(1024 * 64, upload=False)
# Returns True if the corruption signal from PS is propagated to the SK according to the "safekeeper_ps_corruption_detected" metric.
# Raises an exception otherwise.
def check_corruption_signal_propagated_to_sk():
# Get metrics from all safekeepers
for sk in env.safekeepers:
sk_metrics = sk.http_client().get_metrics()
# Look for our corruption detected metric with the right tenant and timeline
corruption_metrics = sk_metrics.query_all("safekeeper_ps_corruption_detected")
for metric in corruption_metrics:
# Check if there's a metric for our tenant and timeline that has value 1
if (
metric.labels.get("tenant_id") == str(tenant_id)
and metric.labels.get("timeline_id") == str(timeline_id)
and metric.value == 1
):
log.info(f"Corruption detected by safekeeper {sk.id}: {metric}")
return True
raise Exception("Corruption detection feedback not found in any safekeeper metrics")
# Returns True if the corruption signal from PS is propagated to the PG according to the "ps_corruption_detected" metric
# in "neon_perf_counters".
# Raises an exception otherwise.
def check_corruption_signal_propagated_to_pg():
endpoint = workload.endpoint()
results = endpoint.safe_psql("CREATE EXTENSION IF NOT EXISTS neon")
results = endpoint.safe_psql(
"SELECT value FROM neon_perf_counters WHERE metric = 'ps_corruption_detected'"
)
log.info("Query corruption detection metric, results: %s", results)
if results[0][0] == 1:
log.info("Corruption detection signal is raised on Postgres")
return True
raise Exception("Corruption detection signal is not raise on Postgres")
# Confirm that the corruption signal propagates to both the safekeeper and Postgres
wait_until(check_corruption_signal_propagated_to_sk, timeout=10, interval=0.1)
wait_until(check_corruption_signal_propagated_to_pg, timeout=10, interval=0.1)
# Cleanup the failpoint
pageserver_http.configure_failpoints(("create-image-layer-fail-simulated-corruption", "off"))
@pytest.mark.parametrize("enabled", [True, False])
def test_image_layer_compression(neon_env_builder: NeonEnvBuilder, enabled: bool):
tenant_conf = {