mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 08:52:56 +00:00
[BRC-2905] Feed back PS-detected data corruption signals to SK and PG walproposer (#895)
Data corruptions are typically detected on the pageserver side when it replays WAL records. However, since PS doesn't synchronously replay WAL records as they are being ingested through safekeepers, we need some extra plumbing to feed information about pageserver-detected corruptions during compaction (and/or WAL redo in general) back to SK and PG for proper action. We don't yet know what actions PG/SK should take upon receiving the signal, but we should have the detection and feedback in place. Add an extra `corruption_detected` field to the `PageserverFeedback` message that is sent from PS -> SK -> PG. It's a boolean value that is set to true when PS detects a "critical error" that signals data corruption, and it's sent in all `PageserverFeedback` messages. Upon receiving this signal, the safekeeper raises a `safekeeper_ps_corruption_detected` gauge metric (value set to 1). The safekeeper then forwards this signal to PG where a `ps_corruption_detected` gauge metric (value also set to 1) is raised in the `neon_perf_counters` view. Added an integration test in `test_compaction.py::test_ps_corruption_detection_feedback` that confirms that the safekeeper and PG can receive the data corruption signal in the `PageserverFeedback` message in a simulated data corruption.
This commit is contained in:
committed by
Suhas Thalanki
parent
043408a88d
commit
f7dd2108bb
@@ -34,13 +34,16 @@ macro_rules! critical {
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! critical_timeline {
|
||||
($tenant_shard_id:expr, $timeline_id:expr, $($arg:tt)*) => {{
|
||||
($tenant_shard_id:expr, $timeline_id:expr, $corruption_detected:expr, $($arg:tt)*) => {{
|
||||
if cfg!(debug_assertions) {
|
||||
panic!($($arg)*);
|
||||
}
|
||||
// Increment both metrics
|
||||
$crate::logging::TRACING_EVENT_COUNT_METRIC.inc_critical();
|
||||
$crate::logging::HADRON_CRITICAL_STORAGE_EVENT_COUNT_METRIC.inc(&$tenant_shard_id.to_string(), &$timeline_id.to_string());
|
||||
if let Some(c) = $corruption_detected.as_ref() {
|
||||
c.store(true, std::sync::atomic::Ordering::Relaxed);
|
||||
}
|
||||
let backtrace = std::backtrace::Backtrace::capture();
|
||||
tracing::error!("CRITICAL: [tenant_shard_id: {}, timeline_id: {}] {}\n{backtrace}",
|
||||
$tenant_shard_id, $timeline_id, format!($($arg)*));
|
||||
|
||||
@@ -32,6 +32,9 @@ pub struct PageserverFeedback {
|
||||
pub replytime: SystemTime,
|
||||
/// Used to track feedbacks from different shards. Always zero for unsharded tenants.
|
||||
pub shard_number: u32,
|
||||
/// If true, the pageserver has detected corruption and the safekeeper and postgres
|
||||
/// should stop sending WAL.
|
||||
pub corruption_detected: bool,
|
||||
}
|
||||
|
||||
impl PageserverFeedback {
|
||||
@@ -43,6 +46,7 @@ impl PageserverFeedback {
|
||||
disk_consistent_lsn: Lsn::INVALID,
|
||||
replytime: *PG_EPOCH,
|
||||
shard_number: 0,
|
||||
corruption_detected: false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -101,6 +105,13 @@ impl PageserverFeedback {
|
||||
buf.put_u32(self.shard_number);
|
||||
}
|
||||
|
||||
if self.corruption_detected {
|
||||
nkeys += 1;
|
||||
buf.put_slice(b"corruption_detected\0");
|
||||
buf.put_i32(1);
|
||||
buf.put_u8(1);
|
||||
}
|
||||
|
||||
buf[buf_ptr] = nkeys;
|
||||
}
|
||||
|
||||
@@ -147,6 +158,11 @@ impl PageserverFeedback {
|
||||
assert_eq!(len, 4);
|
||||
rf.shard_number = buf.get_u32();
|
||||
}
|
||||
b"corruption_detected" => {
|
||||
let len = buf.get_i32();
|
||||
assert_eq!(len, 1);
|
||||
rf.corruption_detected = buf.get_u8() != 0;
|
||||
}
|
||||
_ => {
|
||||
let len = buf.get_i32();
|
||||
warn!(
|
||||
@@ -206,6 +222,26 @@ mod tests {
|
||||
assert_eq!(rf, rf_parsed);
|
||||
}
|
||||
|
||||
// Test that databricks-specific fields added to the PageserverFeedback message are serialized
|
||||
// and deserialized correctly, in addition to the existing fields from upstream.
|
||||
#[test]
|
||||
fn test_replication_feedback_databricks_fields() {
|
||||
let mut rf = PageserverFeedback::empty();
|
||||
rf.current_timeline_size = 12345678;
|
||||
rf.last_received_lsn = Lsn(23456789);
|
||||
rf.disk_consistent_lsn = Lsn(34567890);
|
||||
rf.remote_consistent_lsn = Lsn(45678901);
|
||||
rf.replytime = *PG_EPOCH + Duration::from_secs(100_000_000);
|
||||
rf.shard_number = 1;
|
||||
rf.corruption_detected = true;
|
||||
|
||||
let mut data = BytesMut::new();
|
||||
rf.serialize(&mut data);
|
||||
|
||||
let rf_parsed = PageserverFeedback::parse(data.freeze());
|
||||
assert_eq!(rf, rf_parsed);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_replication_feedback_unknown_key() {
|
||||
let mut rf = PageserverFeedback::empty();
|
||||
|
||||
@@ -426,6 +426,7 @@ pub fn empty_shmem() -> crate::bindings::WalproposerShmemState {
|
||||
remote_consistent_lsn: 0,
|
||||
replytime: 0,
|
||||
shard_number: 0,
|
||||
corruption_detected: false,
|
||||
};
|
||||
|
||||
let empty_wal_rate_limiter = crate::bindings::WalRateLimiter {
|
||||
|
||||
@@ -2914,6 +2914,7 @@ pub(crate) struct WalIngestMetrics {
|
||||
pub(crate) records_received: IntCounter,
|
||||
pub(crate) records_observed: IntCounter,
|
||||
pub(crate) records_committed: IntCounter,
|
||||
pub(crate) records_filtered: IntCounter,
|
||||
pub(crate) values_committed_metadata_images: IntCounter,
|
||||
pub(crate) values_committed_metadata_deltas: IntCounter,
|
||||
pub(crate) values_committed_data_images: IntCounter,
|
||||
@@ -2969,6 +2970,11 @@ pub(crate) static WAL_INGEST: Lazy<WalIngestMetrics> = Lazy::new(|| {
|
||||
"Number of WAL records which resulted in writes to pageserver storage"
|
||||
)
|
||||
.expect("failed to define a metric"),
|
||||
records_filtered: register_int_counter!(
|
||||
"pageserver_wal_ingest_records_filtered",
|
||||
"Number of WAL records filtered out due to sharding"
|
||||
)
|
||||
.expect("failed to define a metric"),
|
||||
values_committed_metadata_images: values_committed.with_label_values(&["metadata", "image"]),
|
||||
values_committed_metadata_deltas: values_committed.with_label_values(&["metadata", "delta"]),
|
||||
values_committed_data_images: values_committed.with_label_values(&["data", "image"]),
|
||||
|
||||
@@ -397,6 +397,11 @@ pub struct Timeline {
|
||||
/// If true, the last compaction failed.
|
||||
compaction_failed: AtomicBool,
|
||||
|
||||
/// Begin Hadron: If true, the pageserver has likely detected data corruption in the timeline.
|
||||
/// We need to feed this information back to the Safekeeper and postgres for them to take the
|
||||
/// appropriate action.
|
||||
corruption_detected: AtomicBool,
|
||||
|
||||
/// Notifies the tenant compaction loop that there is pending L0 compaction work.
|
||||
l0_compaction_trigger: Arc<Notify>,
|
||||
|
||||
@@ -3310,6 +3315,7 @@ impl Timeline {
|
||||
|
||||
compaction_lock: tokio::sync::Mutex::default(),
|
||||
compaction_failed: AtomicBool::default(),
|
||||
corruption_detected: AtomicBool::default(),
|
||||
l0_compaction_trigger: resources.l0_compaction_trigger,
|
||||
gc_lock: tokio::sync::Mutex::default(),
|
||||
|
||||
@@ -6004,6 +6010,17 @@ impl Timeline {
|
||||
)))
|
||||
});
|
||||
|
||||
// Begin Hadron
|
||||
//
|
||||
fail_point!("create-image-layer-fail-simulated-corruption", |_| {
|
||||
self.corruption_detected
|
||||
.store(true, std::sync::atomic::Ordering::Relaxed);
|
||||
Err(CreateImageLayersError::Other(anyhow::anyhow!(
|
||||
"failpoint create-image-layer-fail-simulated-corruption"
|
||||
)))
|
||||
});
|
||||
// End Hadron
|
||||
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
self.conf.get_vectored_concurrent_io,
|
||||
self.gate
|
||||
@@ -7143,6 +7160,7 @@ impl Timeline {
|
||||
critical_timeline!(
|
||||
self.tenant_shard_id,
|
||||
self.timeline_id,
|
||||
Some(&self.corruption_detected),
|
||||
"walredo failure during page reconstruction: {err:?}"
|
||||
);
|
||||
}
|
||||
|
||||
@@ -1397,6 +1397,7 @@ impl Timeline {
|
||||
critical_timeline!(
|
||||
self.tenant_shard_id,
|
||||
self.timeline_id,
|
||||
Some(&self.corruption_detected),
|
||||
"missing key during compaction: {err:?}"
|
||||
);
|
||||
}
|
||||
@@ -1441,6 +1442,7 @@ impl Timeline {
|
||||
critical_timeline!(
|
||||
self.tenant_shard_id,
|
||||
self.timeline_id,
|
||||
Some(&self.corruption_detected),
|
||||
"could not compact, repartitioning keyspace failed: {e:?}"
|
||||
);
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@ use fail::fail_point;
|
||||
use futures::StreamExt;
|
||||
use postgres_backend::is_expected_io_error;
|
||||
use postgres_connection::PgConnectionConfig;
|
||||
use postgres_ffi::waldecoder::WalStreamDecoder;
|
||||
use postgres_ffi::WAL_SEGMENT_SIZE;
|
||||
use postgres_ffi::v14::xlog_utils::normalize_lsn;
|
||||
use postgres_ffi::waldecoder::WalDecodeError;
|
||||
@@ -25,13 +26,13 @@ use tokio_postgres::replication::ReplicationStream;
|
||||
use tokio_postgres::{Client, SimpleQueryMessage, SimpleQueryRow};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{Instrument, debug, error, info, trace, warn};
|
||||
use utils::critical_timeline;
|
||||
use utils::{critical_timeline};
|
||||
use utils::id::NodeId;
|
||||
use utils::lsn::Lsn;
|
||||
use utils::pageserver_feedback::PageserverFeedback;
|
||||
use utils::postgres_client::PostgresClientProtocol;
|
||||
use utils::sync::gate::GateError;
|
||||
use wal_decoder::models::{FlushUncommittedRecords, InterpretedWalRecords};
|
||||
use wal_decoder::models::{FlushUncommittedRecords, InterpretedWalRecord, InterpretedWalRecords};
|
||||
use wal_decoder::wire_format::FromWireFormat;
|
||||
|
||||
use super::TaskStateUpdate;
|
||||
@@ -275,6 +276,8 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
let copy_stream = replication_client.copy_both_simple(&query).await?;
|
||||
let mut physical_stream = pin!(ReplicationStream::new(copy_stream));
|
||||
|
||||
let mut waldecoder = WalStreamDecoder::new(startpoint, timeline.pg_version);
|
||||
|
||||
let mut walingest = WalIngest::new(timeline.as_ref(), startpoint, &ctx)
|
||||
.await
|
||||
.map_err(|e| match e.kind {
|
||||
@@ -282,6 +285,8 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
_ => WalReceiverError::Other(e.into()),
|
||||
})?;
|
||||
|
||||
let shard = vec![*timeline.get_shard_identity()];
|
||||
|
||||
let (format, compression) = match protocol {
|
||||
PostgresClientProtocol::Interpreted {
|
||||
format,
|
||||
@@ -365,6 +370,7 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
critical_timeline!(
|
||||
timeline.tenant_shard_id,
|
||||
timeline.timeline_id,
|
||||
Some(&timeline.corruption_detected),
|
||||
"{msg}"
|
||||
);
|
||||
return Err(WalReceiverError::Other(anyhow!(msg)));
|
||||
@@ -382,6 +388,7 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
critical_timeline!(
|
||||
timeline.tenant_shard_id,
|
||||
timeline.timeline_id,
|
||||
Some(&timeline.corruption_detected),
|
||||
"{msg}"
|
||||
);
|
||||
return Err(WalReceiverError::Other(anyhow!(msg)));
|
||||
@@ -455,7 +462,9 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
critical_timeline!(
|
||||
timeline.tenant_shard_id,
|
||||
timeline.timeline_id,
|
||||
Some(&timeline.corruption_detected),
|
||||
"{err:?}"
|
||||
|
||||
);
|
||||
}
|
||||
})?;
|
||||
@@ -509,6 +518,143 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
Some(streaming_lsn)
|
||||
}
|
||||
|
||||
ReplicationMessage::XLogData(xlog_data) => {
|
||||
async fn commit(
|
||||
modification: &mut DatadirModification<'_>,
|
||||
uncommitted: &mut u64,
|
||||
filtered: &mut u64,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let stats = modification.stats();
|
||||
modification.commit(ctx).await?;
|
||||
WAL_INGEST
|
||||
.records_committed
|
||||
.inc_by(*uncommitted - *filtered);
|
||||
WAL_INGEST.inc_values_committed(&stats);
|
||||
*uncommitted = 0;
|
||||
*filtered = 0;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Pass the WAL data to the decoder, and see if we can decode
|
||||
// more records as a result.
|
||||
let data = xlog_data.data();
|
||||
let startlsn = Lsn::from(xlog_data.wal_start());
|
||||
let endlsn = startlsn + data.len() as u64;
|
||||
|
||||
trace!("received XLogData between {startlsn} and {endlsn}");
|
||||
|
||||
WAL_INGEST.bytes_received.inc_by(data.len() as u64);
|
||||
waldecoder.feed_bytes(data);
|
||||
|
||||
{
|
||||
let mut modification = timeline.begin_modification(startlsn);
|
||||
let mut uncommitted_records = 0;
|
||||
let mut filtered_records = 0;
|
||||
|
||||
while let Some((next_record_lsn, recdata)) = waldecoder.poll_decode()? {
|
||||
// It is important to deal with the aligned records as lsn in getPage@LSN is
|
||||
// aligned and can be several bytes bigger. Without this alignment we are
|
||||
// at risk of hitting a deadlock.
|
||||
if !next_record_lsn.is_aligned() {
|
||||
return Err(WalReceiverError::Other(anyhow!("LSN not aligned")));
|
||||
}
|
||||
|
||||
// Deserialize and interpret WAL record
|
||||
let interpreted = InterpretedWalRecord::from_bytes_filtered(
|
||||
recdata,
|
||||
&shard,
|
||||
next_record_lsn,
|
||||
modification.tline.pg_version,
|
||||
)?
|
||||
.remove(timeline.get_shard_identity())
|
||||
.unwrap();
|
||||
|
||||
if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes)
|
||||
&& uncommitted_records > 0
|
||||
{
|
||||
// Special case: legacy PG database creations operate by reading pages from a 'template' database:
|
||||
// these are the only kinds of WAL record that require reading data blocks while ingesting. Ensure
|
||||
// all earlier writes of data blocks are visible by committing any modification in flight.
|
||||
commit(
|
||||
&mut modification,
|
||||
&mut uncommitted_records,
|
||||
&mut filtered_records,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
// Ingest the records without immediately committing them.
|
||||
timeline.metrics.wal_records_received.inc();
|
||||
let ingested = walingest
|
||||
.ingest_record(interpreted, &mut modification, &ctx)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("could not ingest record at {next_record_lsn}")
|
||||
})
|
||||
.inspect_err(|err| {
|
||||
// TODO: we can't differentiate cancellation errors with
|
||||
// anyhow::Error, so just ignore it if we're cancelled.
|
||||
if !cancellation.is_cancelled() {
|
||||
critical_timeline!(
|
||||
timeline.tenant_shard_id,
|
||||
timeline.timeline_id,
|
||||
Some(&timeline.corruption_detected),
|
||||
"{err:?}"
|
||||
)
|
||||
}
|
||||
})?;
|
||||
if !ingested {
|
||||
tracing::debug!("ingest: filtered out record @ LSN {next_record_lsn}");
|
||||
WAL_INGEST.records_filtered.inc();
|
||||
filtered_records += 1;
|
||||
}
|
||||
|
||||
// FIXME: this cannot be made pausable_failpoint without fixing the
|
||||
// failpoint library; in tests, the added amount of debugging will cause us
|
||||
// to timeout the tests.
|
||||
fail_point!("walreceiver-after-ingest");
|
||||
|
||||
last_rec_lsn = next_record_lsn;
|
||||
|
||||
// Commit every ingest_batch_size records. Even if we filtered out
|
||||
// all records, we still need to call commit to advance the LSN.
|
||||
uncommitted_records += 1;
|
||||
if uncommitted_records >= ingest_batch_size
|
||||
|| modification.approx_pending_bytes()
|
||||
> DatadirModification::MAX_PENDING_BYTES
|
||||
{
|
||||
commit(
|
||||
&mut modification,
|
||||
&mut uncommitted_records,
|
||||
&mut filtered_records,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
// Commit the remaining records.
|
||||
if uncommitted_records > 0 {
|
||||
commit(
|
||||
&mut modification,
|
||||
&mut uncommitted_records,
|
||||
&mut filtered_records,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
if !caught_up && endlsn >= end_of_wal {
|
||||
info!("caught up at LSN {endlsn}");
|
||||
caught_up = true;
|
||||
}
|
||||
|
||||
Some(endlsn)
|
||||
}
|
||||
|
||||
ReplicationMessage::PrimaryKeepAlive(keepalive) => {
|
||||
let wal_end = keepalive.wal_end();
|
||||
let timestamp = keepalive.timestamp();
|
||||
@@ -586,6 +732,9 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
remote_consistent_lsn,
|
||||
replytime: ts,
|
||||
shard_number: timeline.tenant_shard_id.shard_number.0 as u32,
|
||||
corruption_detected: timeline
|
||||
.corruption_detected
|
||||
.load(std::sync::atomic::Ordering::Relaxed),
|
||||
};
|
||||
|
||||
debug!("neon_status_update {status_update:?}");
|
||||
|
||||
@@ -23,6 +23,7 @@
|
||||
|
||||
use std::backtrace::Backtrace;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::{Arc, OnceLock};
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
|
||||
@@ -422,6 +423,8 @@ impl WalIngest {
|
||||
critical_timeline!(
|
||||
modification.tline.tenant_shard_id,
|
||||
modification.tline.timeline_id,
|
||||
// Hadron: No need to raise the corruption flag here; the caller of `ingest_record()` will do it.
|
||||
None::<&AtomicBool>,
|
||||
"clear_vm_bits for unknown VM relation {vm_rel}"
|
||||
);
|
||||
return Ok(());
|
||||
@@ -431,6 +434,8 @@ impl WalIngest {
|
||||
critical_timeline!(
|
||||
modification.tline.tenant_shard_id,
|
||||
modification.tline.timeline_id,
|
||||
// Hadron: No need to raise the corruption flag here; the caller of `ingest_record()` will do it.
|
||||
None::<&AtomicBool>,
|
||||
"new_vm_blk {blknum} not in {vm_rel} of size {vm_size}"
|
||||
);
|
||||
new_vm_blk = None;
|
||||
@@ -441,6 +446,8 @@ impl WalIngest {
|
||||
critical_timeline!(
|
||||
modification.tline.tenant_shard_id,
|
||||
modification.tline.timeline_id,
|
||||
// Hadron: No need to raise the corruption flag here; the caller of `ingest_record()` will do it.
|
||||
None::<&AtomicBool>,
|
||||
"old_vm_blk {blknum} not in {vm_rel} of size {vm_size}"
|
||||
);
|
||||
old_vm_blk = None;
|
||||
|
||||
@@ -45,6 +45,7 @@ DatabricksMetricsShmemInit(void)
|
||||
pg_atomic_init_u32(&databricks_metrics_shared->index_corruption_count, 0);
|
||||
pg_atomic_init_u32(&databricks_metrics_shared->data_corruption_count, 0);
|
||||
pg_atomic_init_u32(&databricks_metrics_shared->internal_error_count, 0);
|
||||
pg_atomic_init_u32(&databricks_metrics_shared->ps_corruption_detected, 0);
|
||||
}
|
||||
}
|
||||
/* END_HADRON */
|
||||
@@ -440,6 +441,7 @@ neon_get_perf_counters(PG_FUNCTION_ARGS)
|
||||
{"sql_index_corruption_count", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->index_corruption_count)},
|
||||
{"sql_data_corruption_count", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->data_corruption_count)},
|
||||
{"sql_internal_error_count", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->internal_error_count)},
|
||||
{"ps_corruption_detected", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->ps_corruption_detected)},
|
||||
{NULL, false, 0, 0},
|
||||
};
|
||||
for (int i = 0; databricks_metrics[i].name != NULL; i++)
|
||||
|
||||
@@ -187,6 +187,7 @@ typedef struct
|
||||
pg_atomic_uint32 index_corruption_count;
|
||||
pg_atomic_uint32 data_corruption_count;
|
||||
pg_atomic_uint32 internal_error_count;
|
||||
pg_atomic_uint32 ps_corruption_detected;
|
||||
} databricks_metrics;
|
||||
|
||||
extern databricks_metrics *databricks_metrics_shared;
|
||||
|
||||
@@ -1887,6 +1887,12 @@ ParsePageserverFeedbackMessage(WalProposer *wp, StringInfo reply_message, Pagese
|
||||
ps_feedback->shard_number = pq_getmsgint(reply_message, sizeof(uint32));
|
||||
psfeedback_log("%u", key, ps_feedback->shard_number);
|
||||
}
|
||||
else if (strcmp(key, "corruption_detected") == 0)
|
||||
{
|
||||
Assert(value_len == 1);
|
||||
ps_feedback->corruption_detected = pq_getmsgbyte(reply_message) != 0;
|
||||
psfeedback_log("%s", key, ps_feedback->corruption_detected ? "true" : "false");
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
|
||||
@@ -374,6 +374,8 @@ typedef struct PageserverFeedback
|
||||
XLogRecPtr remote_consistent_lsn;
|
||||
TimestampTz replytime;
|
||||
uint32 shard_number;
|
||||
/* true if the pageserver has detected data corruption in the timeline */
|
||||
bool corruption_detected;
|
||||
} PageserverFeedback;
|
||||
|
||||
/* BEGIN_HADRON */
|
||||
|
||||
@@ -49,6 +49,7 @@
|
||||
|
||||
#include "libpqwalproposer.h"
|
||||
#include "neon.h"
|
||||
#include "neon_perf_counters.h"
|
||||
#include "neon_walreader.h"
|
||||
#include "walproposer.h"
|
||||
|
||||
@@ -715,6 +716,11 @@ record_pageserver_feedback(PageserverFeedback *ps_feedback, shardno_t num_shards
|
||||
Assert(ps_feedback->shard_number < MAX_SHARDS);
|
||||
Assert(ps_feedback->shard_number < num_shards);
|
||||
|
||||
// Begin Hadron: Record any corruption signal from the pageserver first.
|
||||
if (ps_feedback->corruption_detected) {
|
||||
pg_atomic_write_u32(&databricks_metrics_shared->ps_corruption_detected, 1);
|
||||
}
|
||||
|
||||
SpinLockAcquire(&walprop_shared->mutex);
|
||||
|
||||
// Hadron: Update the num_shards from the source-of-truth (shard map) lazily when we receive
|
||||
|
||||
@@ -387,6 +387,7 @@ pub fn get_filesystem_usage(path: &std::path::Path) -> u64 {
|
||||
critical_timeline!(
|
||||
placeholder_ttid.tenant_id,
|
||||
placeholder_ttid.timeline_id,
|
||||
None::<&AtomicBool>,
|
||||
"Global disk usage watcher failed to read filesystem usage: {:?}",
|
||||
e
|
||||
);
|
||||
|
||||
@@ -518,6 +518,7 @@ pub async fn time_io_closure<E: Into<anyhow::Error>>(
|
||||
pub struct FullTimelineInfo {
|
||||
pub ttid: TenantTimelineId,
|
||||
pub ps_feedback_count: u64,
|
||||
pub ps_corruption_detected: bool,
|
||||
pub last_ps_feedback: PageserverFeedback,
|
||||
pub wal_backup_active: bool,
|
||||
pub timeline_is_active: bool,
|
||||
@@ -547,6 +548,7 @@ pub struct TimelineCollector {
|
||||
ps_last_received_lsn: GenericGaugeVec<AtomicU64>,
|
||||
feedback_last_time_seconds: GenericGaugeVec<AtomicU64>,
|
||||
ps_feedback_count: GenericGaugeVec<AtomicU64>,
|
||||
ps_corruption_detected: IntGaugeVec,
|
||||
timeline_active: GenericGaugeVec<AtomicU64>,
|
||||
wal_backup_active: GenericGaugeVec<AtomicU64>,
|
||||
connected_computes: IntGaugeVec,
|
||||
@@ -654,6 +656,15 @@ impl TimelineCollector {
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let ps_corruption_detected = IntGaugeVec::new(
|
||||
Opts::new(
|
||||
"safekeeper_ps_corruption_detected",
|
||||
"1 if corruption was detected in the timeline according to feedback from the pageserver, 0 otherwise",
|
||||
),
|
||||
&["tenant_id", "timeline_id"],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let timeline_active = GenericGaugeVec::new(
|
||||
Opts::new(
|
||||
"safekeeper_timeline_active",
|
||||
@@ -774,6 +785,7 @@ impl TimelineCollector {
|
||||
ps_last_received_lsn,
|
||||
feedback_last_time_seconds,
|
||||
ps_feedback_count,
|
||||
ps_corruption_detected,
|
||||
timeline_active,
|
||||
wal_backup_active,
|
||||
connected_computes,
|
||||
@@ -892,6 +904,9 @@ impl Collector for TimelineCollector {
|
||||
self.ps_feedback_count
|
||||
.with_label_values(labels)
|
||||
.set(tli.ps_feedback_count);
|
||||
self.ps_corruption_detected
|
||||
.with_label_values(labels)
|
||||
.set(tli.ps_corruption_detected as i64);
|
||||
if let Ok(unix_time) = tli
|
||||
.last_ps_feedback
|
||||
.replytime
|
||||
@@ -925,6 +940,7 @@ impl Collector for TimelineCollector {
|
||||
mfs.extend(self.ps_last_received_lsn.collect());
|
||||
mfs.extend(self.feedback_last_time_seconds.collect());
|
||||
mfs.extend(self.ps_feedback_count.collect());
|
||||
mfs.extend(self.ps_corruption_detected.collect());
|
||||
mfs.extend(self.timeline_active.collect());
|
||||
mfs.extend(self.wal_backup_active.collect());
|
||||
mfs.extend(self.connected_computes.collect());
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use std::collections::HashMap;
|
||||
use std::fmt::Display;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
@@ -305,7 +306,10 @@ impl InterpretedWalReader {
|
||||
critical_timeline!(
|
||||
ttid.tenant_id,
|
||||
ttid.timeline_id,
|
||||
"failed to read WAL record: {err:?}"
|
||||
// Hadron: The corruption flag is only used in PS so that it can feed this information back to SKs.
|
||||
// We do not use these flags in SKs.
|
||||
None::<&AtomicBool>,
|
||||
"failed to read WAL record: {err:?}"
|
||||
);
|
||||
}
|
||||
err => error!("failed to read WAL record: {err}"),
|
||||
@@ -375,7 +379,10 @@ impl InterpretedWalReader {
|
||||
critical_timeline!(
|
||||
ttid.tenant_id,
|
||||
ttid.timeline_id,
|
||||
"failed to decode WAL record: {err:?}"
|
||||
// Hadron: The corruption flag is only used in PS so that it can feed this information back to SKs.
|
||||
// We do not use these flags in SKs.
|
||||
None::<&AtomicBool>,
|
||||
"failed to decode WAL record: {err:?}"
|
||||
);
|
||||
}
|
||||
Err(err) => error!("failed to read WAL record: {err}"),
|
||||
|
||||
@@ -55,6 +55,7 @@ pub struct WalSenders {
|
||||
|
||||
pub struct WalSendersTimelineMetricValues {
|
||||
pub ps_feedback_counter: u64,
|
||||
pub ps_corruption_detected: bool,
|
||||
pub last_ps_feedback: PageserverFeedback,
|
||||
pub interpreted_wal_reader_tasks: usize,
|
||||
}
|
||||
@@ -193,6 +194,7 @@ impl WalSenders {
|
||||
|
||||
WalSendersTimelineMetricValues {
|
||||
ps_feedback_counter: shared.ps_feedback_counter,
|
||||
ps_corruption_detected: shared.ps_corruption_detected,
|
||||
last_ps_feedback: shared.last_ps_feedback,
|
||||
interpreted_wal_reader_tasks,
|
||||
}
|
||||
@@ -209,6 +211,9 @@ impl WalSenders {
|
||||
*shared.get_slot_mut(id).get_mut_feedback() = ReplicationFeedback::Pageserver(*feedback);
|
||||
shared.last_ps_feedback = *feedback;
|
||||
shared.ps_feedback_counter += 1;
|
||||
if feedback.corruption_detected {
|
||||
shared.ps_corruption_detected = true;
|
||||
}
|
||||
drop(shared);
|
||||
|
||||
RECEIVED_PS_FEEDBACKS.inc();
|
||||
@@ -278,6 +283,9 @@ struct WalSendersShared {
|
||||
last_ps_feedback: PageserverFeedback,
|
||||
// total counter of pageserver feedbacks received
|
||||
ps_feedback_counter: u64,
|
||||
// Hadron: true iff we received a pageserver feedback that incidated
|
||||
// data corruption in the timeline
|
||||
ps_corruption_detected: bool,
|
||||
slots: Vec<Option<WalSenderState>>,
|
||||
}
|
||||
|
||||
@@ -328,6 +336,7 @@ impl WalSendersShared {
|
||||
agg_standby_feedback: StandbyFeedback::empty(),
|
||||
last_ps_feedback: PageserverFeedback::empty(),
|
||||
ps_feedback_counter: 0,
|
||||
ps_corruption_detected: false,
|
||||
slots: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -839,6 +839,7 @@ impl Timeline {
|
||||
|
||||
let WalSendersTimelineMetricValues {
|
||||
ps_feedback_counter,
|
||||
ps_corruption_detected,
|
||||
last_ps_feedback,
|
||||
interpreted_wal_reader_tasks,
|
||||
} = self.walsenders.info_for_metrics();
|
||||
@@ -847,6 +848,7 @@ impl Timeline {
|
||||
Some(FullTimelineInfo {
|
||||
ttid: self.ttid,
|
||||
ps_feedback_count: ps_feedback_counter,
|
||||
ps_corruption_detected,
|
||||
last_ps_feedback,
|
||||
wal_backup_active: self.wal_backup_active.load(Ordering::Relaxed),
|
||||
timeline_is_active: self.broker_active.load(Ordering::Relaxed),
|
||||
|
||||
2189
scripts/neon_grep.txt
Normal file
2189
scripts/neon_grep.txt
Normal file
File diff suppressed because it is too large
Load Diff
@@ -863,6 +863,87 @@ def test_pageserver_compaction_circuit_breaker(neon_env_builder: NeonEnvBuilder)
|
||||
assert not env.pageserver.log_contains(".*Circuit breaker failure ended.*")
|
||||
|
||||
|
||||
def test_ps_corruption_detection_feedback(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Test that when the pageserver detects corruption during image layer creation,
|
||||
it sends corruption feedback to the safekeeper which gets recorded in its
|
||||
safekeeper_ps_corruption_detected metric.
|
||||
"""
|
||||
# Configure tenant with aggressive compaction settings to easily trigger compaction
|
||||
TENANT_CONF = {
|
||||
# Small checkpoint distance to create many layers
|
||||
"checkpoint_distance": 1024 * 128,
|
||||
# Compact small layers
|
||||
"compaction_target_size": 1024 * 128,
|
||||
# Create image layers eagerly
|
||||
"image_creation_threshold": 1,
|
||||
"image_layer_creation_check_threshold": 0,
|
||||
# Force frequent compaction
|
||||
"compaction_period": "1s",
|
||||
}
|
||||
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
|
||||
# We are simulating compaction failures so we should allow these error messages.
|
||||
env.pageserver.allowed_errors.append(".*Compaction failed.*")
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
workload = Workload(env, tenant_id, timeline_id)
|
||||
workload.init()
|
||||
|
||||
# Enable the failpoint that will cause image layer creation to fail due to a (simulated) detected
|
||||
# corruption.
|
||||
pageserver_http.configure_failpoints(("create-image-layer-fail-simulated-corruption", "return"))
|
||||
|
||||
# Write some data to trigger compaction and image layer creation
|
||||
log.info("Writing data to trigger compaction...")
|
||||
workload.write_rows(1024 * 64, upload=False)
|
||||
workload.write_rows(1024 * 64, upload=False)
|
||||
|
||||
# Returns True if the corruption signal from PS is propagated to the SK according to the "safekeeper_ps_corruption_detected" metric.
|
||||
# Raises an exception otherwise.
|
||||
def check_corruption_signal_propagated_to_sk():
|
||||
# Get metrics from all safekeepers
|
||||
for sk in env.safekeepers:
|
||||
sk_metrics = sk.http_client().get_metrics()
|
||||
# Look for our corruption detected metric with the right tenant and timeline
|
||||
corruption_metrics = sk_metrics.query_all("safekeeper_ps_corruption_detected")
|
||||
|
||||
for metric in corruption_metrics:
|
||||
# Check if there's a metric for our tenant and timeline that has value 1
|
||||
if (
|
||||
metric.labels.get("tenant_id") == str(tenant_id)
|
||||
and metric.labels.get("timeline_id") == str(timeline_id)
|
||||
and metric.value == 1
|
||||
):
|
||||
log.info(f"Corruption detected by safekeeper {sk.id}: {metric}")
|
||||
return True
|
||||
raise Exception("Corruption detection feedback not found in any safekeeper metrics")
|
||||
|
||||
# Returns True if the corruption signal from PS is propagated to the PG according to the "ps_corruption_detected" metric
|
||||
# in "neon_perf_counters".
|
||||
# Raises an exception otherwise.
|
||||
def check_corruption_signal_propagated_to_pg():
|
||||
endpoint = workload.endpoint()
|
||||
results = endpoint.safe_psql("CREATE EXTENSION IF NOT EXISTS neon")
|
||||
results = endpoint.safe_psql(
|
||||
"SELECT value FROM neon_perf_counters WHERE metric = 'ps_corruption_detected'"
|
||||
)
|
||||
log.info("Query corruption detection metric, results: %s", results)
|
||||
if results[0][0] == 1:
|
||||
log.info("Corruption detection signal is raised on Postgres")
|
||||
return True
|
||||
raise Exception("Corruption detection signal is not raise on Postgres")
|
||||
|
||||
# Confirm that the corruption signal propagates to both the safekeeper and Postgres
|
||||
wait_until(check_corruption_signal_propagated_to_sk, timeout=10, interval=0.1)
|
||||
wait_until(check_corruption_signal_propagated_to_pg, timeout=10, interval=0.1)
|
||||
|
||||
# Cleanup the failpoint
|
||||
pageserver_http.configure_failpoints(("create-image-layer-fail-simulated-corruption", "off"))
|
||||
|
||||
|
||||
@pytest.mark.parametrize("enabled", [True, False])
|
||||
def test_image_layer_compression(neon_env_builder: NeonEnvBuilder, enabled: bool):
|
||||
tenant_conf = {
|
||||
|
||||
Reference in New Issue
Block a user