From 07c3cfd2a0935d91eb9da859b65ce6a28a4d5c04 Mon Sep 17 00:00:00 2001 From: Suhas Thalanki <54014218+thesuhas@users.noreply.github.com> Date: Tue, 29 Jul 2025 16:40:07 -0400 Subject: [PATCH] =?UTF-8?q?[BRC-2905]=20Feed=20back=20PS-detected=20data?= =?UTF-8?q?=20corruption=20signals=20to=20SK=20and=20PG=E2=80=A6=20(#12748?= =?UTF-8?q?)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit … walproposer (#895) Data corruptions are typically detected on the pageserver side when it replays WAL records. However, since PS doesn't synchronously replay WAL records as they are being ingested through safekeepers, we need some extra plumbing to feed information about pageserver-detected corruptions during compaction (and/or WAL redo in general) back to SK and PG for proper action. We don't yet know what actions PG/SK should take upon receiving the signal, but we should have the detection and feedback in place. Add an extra `corruption_detected` field to the `PageserverFeedback` message that is sent from PS -> SK -> PG. It's a boolean value that is set to true when PS detects a "critical error" that signals data corruption, and it's sent in all `PageserverFeedback` messages. Upon receiving this signal, the safekeeper raises a `safekeeper_ps_corruption_detected` gauge metric (value set to 1). The safekeeper then forwards this signal to PG where a `ps_corruption_detected` gauge metric (value also set to 1) is raised in the `neon_perf_counters` view. Added an integration test in `test_compaction.py::test_ps_corruption_detection_feedback` that confirms that the safekeeper and PG can receive the data corruption signal in the `PageserverFeedback` message in a simulated data corruption. ## Problem ## Summary of changes --------- Co-authored-by: William Huang --- libs/utils/src/logging.rs | 5 +- libs/utils/src/pageserver_feedback.rs | 36 ++++++++ libs/walproposer/src/api_bindings.rs | 1 + pageserver/src/tenant/timeline.rs | 18 ++++ pageserver/src/tenant/timeline/compaction.rs | 2 + .../walreceiver/walreceiver_connection.rs | 6 ++ pageserver/src/walingest.rs | 7 ++ pgxn/neon/neon_perf_counters.c | 2 + pgxn/neon/neon_perf_counters.h | 1 + pgxn/neon/walproposer.c | 6 ++ pgxn/neon/walproposer.h | 2 + pgxn/neon/walproposer_pg.c | 6 ++ safekeeper/src/hadron.rs | 1 + safekeeper/src/metrics.rs | 16 ++++ safekeeper/src/send_interpreted_wal.rs | 7 ++ safekeeper/src/send_wal.rs | 9 ++ safekeeper/src/timeline.rs | 2 + test_runner/regress/test_compaction.py | 82 +++++++++++++++++++ 18 files changed, 208 insertions(+), 1 deletion(-) diff --git a/libs/utils/src/logging.rs b/libs/utils/src/logging.rs index d67c0f123b..9f118048f3 100644 --- a/libs/utils/src/logging.rs +++ b/libs/utils/src/logging.rs @@ -34,13 +34,16 @@ macro_rules! critical { #[macro_export] macro_rules! critical_timeline { - ($tenant_shard_id:expr, $timeline_id:expr, $($arg:tt)*) => {{ + ($tenant_shard_id:expr, $timeline_id:expr, $corruption_detected:expr, $($arg:tt)*) => {{ if cfg!(debug_assertions) { panic!($($arg)*); } // Increment both metrics $crate::logging::TRACING_EVENT_COUNT_METRIC.inc_critical(); $crate::logging::HADRON_CRITICAL_STORAGE_EVENT_COUNT_METRIC.inc(&$tenant_shard_id.to_string(), &$timeline_id.to_string()); + if let Some(c) = $corruption_detected.as_ref() { + c.store(true, std::sync::atomic::Ordering::Relaxed); + } let backtrace = std::backtrace::Backtrace::capture(); tracing::error!("CRITICAL: [tenant_shard_id: {}, timeline_id: {}] {}\n{backtrace}", $tenant_shard_id, $timeline_id, format!($($arg)*)); diff --git a/libs/utils/src/pageserver_feedback.rs b/libs/utils/src/pageserver_feedback.rs index cffbc0b4d6..da5b53306a 100644 --- a/libs/utils/src/pageserver_feedback.rs +++ b/libs/utils/src/pageserver_feedback.rs @@ -32,6 +32,9 @@ pub struct PageserverFeedback { pub replytime: SystemTime, /// Used to track feedbacks from different shards. Always zero for unsharded tenants. pub shard_number: u32, + /// If true, the pageserver has detected corruption and the safekeeper and postgres + /// should stop sending WAL. + pub corruption_detected: bool, } impl PageserverFeedback { @@ -43,6 +46,7 @@ impl PageserverFeedback { disk_consistent_lsn: Lsn::INVALID, replytime: *PG_EPOCH, shard_number: 0, + corruption_detected: false, } } @@ -101,6 +105,13 @@ impl PageserverFeedback { buf.put_u32(self.shard_number); } + if self.corruption_detected { + nkeys += 1; + buf.put_slice(b"corruption_detected\0"); + buf.put_i32(1); + buf.put_u8(1); + } + buf[buf_ptr] = nkeys; } @@ -147,6 +158,11 @@ impl PageserverFeedback { assert_eq!(len, 4); rf.shard_number = buf.get_u32(); } + b"corruption_detected" => { + let len = buf.get_i32(); + assert_eq!(len, 1); + rf.corruption_detected = buf.get_u8() != 0; + } _ => { let len = buf.get_i32(); warn!( @@ -206,6 +222,26 @@ mod tests { assert_eq!(rf, rf_parsed); } + // Test that databricks-specific fields added to the PageserverFeedback message are serialized + // and deserialized correctly, in addition to the existing fields from upstream. + #[test] + fn test_replication_feedback_databricks_fields() { + let mut rf = PageserverFeedback::empty(); + rf.current_timeline_size = 12345678; + rf.last_received_lsn = Lsn(23456789); + rf.disk_consistent_lsn = Lsn(34567890); + rf.remote_consistent_lsn = Lsn(45678901); + rf.replytime = *PG_EPOCH + Duration::from_secs(100_000_000); + rf.shard_number = 1; + rf.corruption_detected = true; + + let mut data = BytesMut::new(); + rf.serialize(&mut data); + + let rf_parsed = PageserverFeedback::parse(data.freeze()); + assert_eq!(rf, rf_parsed); + } + #[test] fn test_replication_feedback_unknown_key() { let mut rf = PageserverFeedback::empty(); diff --git a/libs/walproposer/src/api_bindings.rs b/libs/walproposer/src/api_bindings.rs index c3be1e1dae..9f88ea6b11 100644 --- a/libs/walproposer/src/api_bindings.rs +++ b/libs/walproposer/src/api_bindings.rs @@ -426,6 +426,7 @@ pub fn empty_shmem() -> crate::bindings::WalproposerShmemState { remote_consistent_lsn: 0, replytime: 0, shard_number: 0, + corruption_detected: false, }; let empty_wal_rate_limiter = crate::bindings::WalRateLimiter { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 2c70c5cfa5..ff66b0ecc8 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -397,6 +397,11 @@ pub struct Timeline { /// If true, the last compaction failed. compaction_failed: AtomicBool, + /// Begin Hadron: If true, the pageserver has likely detected data corruption in the timeline. + /// We need to feed this information back to the Safekeeper and postgres for them to take the + /// appropriate action. + corruption_detected: AtomicBool, + /// Notifies the tenant compaction loop that there is pending L0 compaction work. l0_compaction_trigger: Arc, @@ -3310,6 +3315,7 @@ impl Timeline { compaction_lock: tokio::sync::Mutex::default(), compaction_failed: AtomicBool::default(), + corruption_detected: AtomicBool::default(), l0_compaction_trigger: resources.l0_compaction_trigger, gc_lock: tokio::sync::Mutex::default(), @@ -6004,6 +6010,17 @@ impl Timeline { ))) }); + // Begin Hadron + // + fail_point!("create-image-layer-fail-simulated-corruption", |_| { + self.corruption_detected + .store(true, std::sync::atomic::Ordering::Relaxed); + Err(CreateImageLayersError::Other(anyhow::anyhow!( + "failpoint create-image-layer-fail-simulated-corruption" + ))) + }); + // End Hadron + let io_concurrency = IoConcurrency::spawn_from_conf( self.conf.get_vectored_concurrent_io, self.gate @@ -7149,6 +7166,7 @@ impl Timeline { critical_timeline!( self.tenant_shard_id, self.timeline_id, + Some(&self.corruption_detected), "walredo failure during page reconstruction: {err:?}" ); } diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 9bca952a46..c5363d84b7 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -1397,6 +1397,7 @@ impl Timeline { critical_timeline!( self.tenant_shard_id, self.timeline_id, + Some(&self.corruption_detected), "missing key during compaction: {err:?}" ); } @@ -1441,6 +1442,7 @@ impl Timeline { critical_timeline!( self.tenant_shard_id, self.timeline_id, + Some(&self.corruption_detected), "could not compact, repartitioning keyspace failed: {e:?}" ); } diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index f619c69599..7ec5aa3b77 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -365,6 +365,7 @@ pub(super) async fn handle_walreceiver_connection( critical_timeline!( timeline.tenant_shard_id, timeline.timeline_id, + Some(&timeline.corruption_detected), "{msg}" ); return Err(WalReceiverError::Other(anyhow!(msg))); @@ -382,6 +383,7 @@ pub(super) async fn handle_walreceiver_connection( critical_timeline!( timeline.tenant_shard_id, timeline.timeline_id, + Some(&timeline.corruption_detected), "{msg}" ); return Err(WalReceiverError::Other(anyhow!(msg))); @@ -455,6 +457,7 @@ pub(super) async fn handle_walreceiver_connection( critical_timeline!( timeline.tenant_shard_id, timeline.timeline_id, + Some(&timeline.corruption_detected), "{err:?}" ); } @@ -586,6 +589,9 @@ pub(super) async fn handle_walreceiver_connection( remote_consistent_lsn, replytime: ts, shard_number: timeline.tenant_shard_id.shard_number.0 as u32, + corruption_detected: timeline + .corruption_detected + .load(std::sync::atomic::Ordering::Relaxed), }; debug!("neon_status_update {status_update:?}"); diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 3acf98b020..c364334dab 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -23,6 +23,7 @@ use std::backtrace::Backtrace; use std::collections::HashMap; +use std::sync::atomic::AtomicBool; use std::sync::{Arc, OnceLock}; use std::time::{Duration, Instant, SystemTime}; @@ -422,6 +423,8 @@ impl WalIngest { critical_timeline!( modification.tline.tenant_shard_id, modification.tline.timeline_id, + // Hadron: No need to raise the corruption flag here; the caller of `ingest_record()` will do it. + None::<&AtomicBool>, "clear_vm_bits for unknown VM relation {vm_rel}" ); return Ok(()); @@ -431,6 +434,8 @@ impl WalIngest { critical_timeline!( modification.tline.tenant_shard_id, modification.tline.timeline_id, + // Hadron: No need to raise the corruption flag here; the caller of `ingest_record()` will do it. + None::<&AtomicBool>, "new_vm_blk {blknum} not in {vm_rel} of size {vm_size}" ); new_vm_blk = None; @@ -441,6 +446,8 @@ impl WalIngest { critical_timeline!( modification.tline.tenant_shard_id, modification.tline.timeline_id, + // Hadron: No need to raise the corruption flag here; the caller of `ingest_record()` will do it. + None::<&AtomicBool>, "old_vm_blk {blknum} not in {vm_rel} of size {vm_size}" ); old_vm_blk = None; diff --git a/pgxn/neon/neon_perf_counters.c b/pgxn/neon/neon_perf_counters.c index a38f876a0c..fada4cba1e 100644 --- a/pgxn/neon/neon_perf_counters.c +++ b/pgxn/neon/neon_perf_counters.c @@ -45,6 +45,7 @@ DatabricksMetricsShmemInit(void) pg_atomic_init_u32(&databricks_metrics_shared->index_corruption_count, 0); pg_atomic_init_u32(&databricks_metrics_shared->data_corruption_count, 0); pg_atomic_init_u32(&databricks_metrics_shared->internal_error_count, 0); + pg_atomic_init_u32(&databricks_metrics_shared->ps_corruption_detected, 0); } } /* END_HADRON */ @@ -440,6 +441,7 @@ neon_get_perf_counters(PG_FUNCTION_ARGS) {"sql_index_corruption_count", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->index_corruption_count)}, {"sql_data_corruption_count", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->data_corruption_count)}, {"sql_internal_error_count", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->internal_error_count)}, + {"ps_corruption_detected", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->ps_corruption_detected)}, {NULL, false, 0, 0}, }; for (int i = 0; databricks_metrics[i].name != NULL; i++) diff --git a/pgxn/neon/neon_perf_counters.h b/pgxn/neon/neon_perf_counters.h index 0196559806..5c0b7ded7a 100644 --- a/pgxn/neon/neon_perf_counters.h +++ b/pgxn/neon/neon_perf_counters.h @@ -183,6 +183,7 @@ typedef struct pg_atomic_uint32 index_corruption_count; pg_atomic_uint32 data_corruption_count; pg_atomic_uint32 internal_error_count; + pg_atomic_uint32 ps_corruption_detected; } databricks_metrics; extern databricks_metrics *databricks_metrics_shared; diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index ba6e4a54ff..c85a6f4b6f 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -1887,6 +1887,12 @@ ParsePageserverFeedbackMessage(WalProposer *wp, StringInfo reply_message, Pagese ps_feedback->shard_number = pq_getmsgint(reply_message, sizeof(uint32)); psfeedback_log("%u", key, ps_feedback->shard_number); } + else if (strcmp(key, "corruption_detected") == 0) + { + Assert(value_len == 1); + ps_feedback->corruption_detected = pq_getmsgbyte(reply_message) != 0; + psfeedback_log("%s", key, ps_feedback->corruption_detected ? "true" : "false"); + } else { /* diff --git a/pgxn/neon/walproposer.h b/pgxn/neon/walproposer.h index 5507294c3b..d6cd532bec 100644 --- a/pgxn/neon/walproposer.h +++ b/pgxn/neon/walproposer.h @@ -374,6 +374,8 @@ typedef struct PageserverFeedback XLogRecPtr remote_consistent_lsn; TimestampTz replytime; uint32 shard_number; + /* true if the pageserver has detected data corruption in the timeline */ + bool corruption_detected; } PageserverFeedback; /* BEGIN_HADRON */ diff --git a/pgxn/neon/walproposer_pg.c b/pgxn/neon/walproposer_pg.c index b0f5828d39..da86c5d498 100644 --- a/pgxn/neon/walproposer_pg.c +++ b/pgxn/neon/walproposer_pg.c @@ -49,6 +49,7 @@ #include "libpqwalproposer.h" #include "neon.h" +#include "neon_perf_counters.h" #include "neon_walreader.h" #include "walproposer.h" @@ -741,6 +742,11 @@ record_pageserver_feedback(PageserverFeedback *ps_feedback, shardno_t num_shards Assert(ps_feedback->shard_number < MAX_SHARDS); Assert(ps_feedback->shard_number < num_shards); + // Begin Hadron: Record any corruption signal from the pageserver first. + if (ps_feedback->corruption_detected) { + pg_atomic_write_u32(&databricks_metrics_shared->ps_corruption_detected, 1); + } + SpinLockAcquire(&walprop_shared->mutex); // Hadron: Update the num_shards from the source-of-truth (shard map) lazily when we receive diff --git a/safekeeper/src/hadron.rs b/safekeeper/src/hadron.rs index f41fe2512d..72b377fcc4 100644 --- a/safekeeper/src/hadron.rs +++ b/safekeeper/src/hadron.rs @@ -387,6 +387,7 @@ pub fn get_filesystem_usage(path: &std::path::Path) -> u64 { critical_timeline!( placeholder_ttid.tenant_id, placeholder_ttid.timeline_id, + None::<&AtomicBool>, "Global disk usage watcher failed to read filesystem usage: {:?}", e ); diff --git a/safekeeper/src/metrics.rs b/safekeeper/src/metrics.rs index b07852aaee..08d96a7aa6 100644 --- a/safekeeper/src/metrics.rs +++ b/safekeeper/src/metrics.rs @@ -518,6 +518,7 @@ pub async fn time_io_closure>( pub struct FullTimelineInfo { pub ttid: TenantTimelineId, pub ps_feedback_count: u64, + pub ps_corruption_detected: bool, pub last_ps_feedback: PageserverFeedback, pub wal_backup_active: bool, pub timeline_is_active: bool, @@ -547,6 +548,7 @@ pub struct TimelineCollector { ps_last_received_lsn: GenericGaugeVec, feedback_last_time_seconds: GenericGaugeVec, ps_feedback_count: GenericGaugeVec, + ps_corruption_detected: IntGaugeVec, timeline_active: GenericGaugeVec, wal_backup_active: GenericGaugeVec, connected_computes: IntGaugeVec, @@ -654,6 +656,15 @@ impl TimelineCollector { ) .unwrap(); + let ps_corruption_detected = IntGaugeVec::new( + Opts::new( + "safekeeper_ps_corruption_detected", + "1 if corruption was detected in the timeline according to feedback from the pageserver, 0 otherwise", + ), + &["tenant_id", "timeline_id"], + ) + .unwrap(); + let timeline_active = GenericGaugeVec::new( Opts::new( "safekeeper_timeline_active", @@ -774,6 +785,7 @@ impl TimelineCollector { ps_last_received_lsn, feedback_last_time_seconds, ps_feedback_count, + ps_corruption_detected, timeline_active, wal_backup_active, connected_computes, @@ -892,6 +904,9 @@ impl Collector for TimelineCollector { self.ps_feedback_count .with_label_values(labels) .set(tli.ps_feedback_count); + self.ps_corruption_detected + .with_label_values(labels) + .set(tli.ps_corruption_detected as i64); if let Ok(unix_time) = tli .last_ps_feedback .replytime @@ -925,6 +940,7 @@ impl Collector for TimelineCollector { mfs.extend(self.ps_last_received_lsn.collect()); mfs.extend(self.feedback_last_time_seconds.collect()); mfs.extend(self.ps_feedback_count.collect()); + mfs.extend(self.ps_corruption_detected.collect()); mfs.extend(self.timeline_active.collect()); mfs.extend(self.wal_backup_active.collect()); mfs.extend(self.connected_computes.collect()); diff --git a/safekeeper/src/send_interpreted_wal.rs b/safekeeper/src/send_interpreted_wal.rs index 671798298b..bfc4008c52 100644 --- a/safekeeper/src/send_interpreted_wal.rs +++ b/safekeeper/src/send_interpreted_wal.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use std::fmt::Display; use std::sync::Arc; +use std::sync::atomic::AtomicBool; use std::time::Duration; use anyhow::{Context, anyhow}; @@ -305,6 +306,9 @@ impl InterpretedWalReader { critical_timeline!( ttid.tenant_id, ttid.timeline_id, + // Hadron: The corruption flag is only used in PS so that it can feed this information back to SKs. + // We do not use these flags in SKs. + None::<&AtomicBool>, "failed to read WAL record: {err:?}" ); } @@ -375,6 +379,9 @@ impl InterpretedWalReader { critical_timeline!( ttid.tenant_id, ttid.timeline_id, + // Hadron: The corruption flag is only used in PS so that it can feed this information back to SKs. + // We do not use these flags in SKs. + None::<&AtomicBool>, "failed to decode WAL record: {err:?}" ); } diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index 5891fa88a4..2d6f7486a9 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -55,6 +55,7 @@ pub struct WalSenders { pub struct WalSendersTimelineMetricValues { pub ps_feedback_counter: u64, + pub ps_corruption_detected: bool, pub last_ps_feedback: PageserverFeedback, pub interpreted_wal_reader_tasks: usize, } @@ -193,6 +194,7 @@ impl WalSenders { WalSendersTimelineMetricValues { ps_feedback_counter: shared.ps_feedback_counter, + ps_corruption_detected: shared.ps_corruption_detected, last_ps_feedback: shared.last_ps_feedback, interpreted_wal_reader_tasks, } @@ -209,6 +211,9 @@ impl WalSenders { *shared.get_slot_mut(id).get_mut_feedback() = ReplicationFeedback::Pageserver(*feedback); shared.last_ps_feedback = *feedback; shared.ps_feedback_counter += 1; + if feedback.corruption_detected { + shared.ps_corruption_detected = true; + } drop(shared); RECEIVED_PS_FEEDBACKS.inc(); @@ -278,6 +283,9 @@ struct WalSendersShared { last_ps_feedback: PageserverFeedback, // total counter of pageserver feedbacks received ps_feedback_counter: u64, + // Hadron: true iff we received a pageserver feedback that incidated + // data corruption in the timeline + ps_corruption_detected: bool, slots: Vec>, } @@ -328,6 +336,7 @@ impl WalSendersShared { agg_standby_feedback: StandbyFeedback::empty(), last_ps_feedback: PageserverFeedback::empty(), ps_feedback_counter: 0, + ps_corruption_detected: false, slots: Vec::new(), } } diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 43b5b3a8d3..e083a49428 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -839,6 +839,7 @@ impl Timeline { let WalSendersTimelineMetricValues { ps_feedback_counter, + ps_corruption_detected, last_ps_feedback, interpreted_wal_reader_tasks, } = self.walsenders.info_for_metrics(); @@ -847,6 +848,7 @@ impl Timeline { Some(FullTimelineInfo { ttid: self.ttid, ps_feedback_count: ps_feedback_counter, + ps_corruption_detected, last_ps_feedback, wal_backup_active: self.wal_backup_active.load(Ordering::Relaxed), timeline_is_active: self.broker_active.load(Ordering::Relaxed), diff --git a/test_runner/regress/test_compaction.py b/test_runner/regress/test_compaction.py index be82ee806f..a780aa2e52 100644 --- a/test_runner/regress/test_compaction.py +++ b/test_runner/regress/test_compaction.py @@ -863,6 +863,88 @@ def test_pageserver_compaction_circuit_breaker(neon_env_builder: NeonEnvBuilder) assert not env.pageserver.log_contains(".*Circuit breaker failure ended.*") +@pytest.mark.skip(reason="Lakebase mode") +def test_ps_corruption_detection_feedback(neon_env_builder: NeonEnvBuilder): + """ + Test that when the pageserver detects corruption during image layer creation, + it sends corruption feedback to the safekeeper which gets recorded in its + safekeeper_ps_corruption_detected metric. + """ + # Configure tenant with aggressive compaction settings to easily trigger compaction + TENANT_CONF = { + # Small checkpoint distance to create many layers + "checkpoint_distance": 1024 * 128, + # Compact small layers + "compaction_target_size": 1024 * 128, + # Create image layers eagerly + "image_creation_threshold": 1, + "image_layer_creation_check_threshold": 0, + # Force frequent compaction + "compaction_period": "1s", + } + + env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF) + # We are simulating compaction failures so we should allow these error messages. + env.pageserver.allowed_errors.append(".*Compaction failed.*") + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline + + pageserver_http = env.pageserver.http_client() + workload = Workload(env, tenant_id, timeline_id) + workload.init() + + # Enable the failpoint that will cause image layer creation to fail due to a (simulated) detected + # corruption. + pageserver_http.configure_failpoints(("create-image-layer-fail-simulated-corruption", "return")) + + # Write some data to trigger compaction and image layer creation + log.info("Writing data to trigger compaction...") + workload.write_rows(1024 * 64, upload=False) + workload.write_rows(1024 * 64, upload=False) + + # Returns True if the corruption signal from PS is propagated to the SK according to the "safekeeper_ps_corruption_detected" metric. + # Raises an exception otherwise. + def check_corruption_signal_propagated_to_sk(): + # Get metrics from all safekeepers + for sk in env.safekeepers: + sk_metrics = sk.http_client().get_metrics() + # Look for our corruption detected metric with the right tenant and timeline + corruption_metrics = sk_metrics.query_all("safekeeper_ps_corruption_detected") + + for metric in corruption_metrics: + # Check if there's a metric for our tenant and timeline that has value 1 + if ( + metric.labels.get("tenant_id") == str(tenant_id) + and metric.labels.get("timeline_id") == str(timeline_id) + and metric.value == 1 + ): + log.info(f"Corruption detected by safekeeper {sk.id}: {metric}") + return True + raise Exception("Corruption detection feedback not found in any safekeeper metrics") + + # Returns True if the corruption signal from PS is propagated to the PG according to the "ps_corruption_detected" metric + # in "neon_perf_counters". + # Raises an exception otherwise. + def check_corruption_signal_propagated_to_pg(): + endpoint = workload.endpoint() + results = endpoint.safe_psql("CREATE EXTENSION IF NOT EXISTS neon") + results = endpoint.safe_psql( + "SELECT value FROM neon_perf_counters WHERE metric = 'ps_corruption_detected'" + ) + log.info("Query corruption detection metric, results: %s", results) + if results[0][0] == 1: + log.info("Corruption detection signal is raised on Postgres") + return True + raise Exception("Corruption detection signal is not raise on Postgres") + + # Confirm that the corruption signal propagates to both the safekeeper and Postgres + wait_until(check_corruption_signal_propagated_to_sk, timeout=10, interval=0.1) + wait_until(check_corruption_signal_propagated_to_pg, timeout=10, interval=0.1) + + # Cleanup the failpoint + pageserver_http.configure_failpoints(("create-image-layer-fail-simulated-corruption", "off")) + + @pytest.mark.parametrize("enabled", [True, False]) def test_image_layer_compression(neon_env_builder: NeonEnvBuilder, enabled: bool): tenant_conf = {