diff --git a/.gitmodules b/.gitmodules index e381fb079e..ac59e411f9 100644 --- a/.gitmodules +++ b/.gitmodules @@ -9,7 +9,7 @@ [submodule "vendor/postgres-v16"] path = vendor/postgres-v16 url = ../postgres.git - branch = REL_16_STABLE_neon + branch = cherry-pick/16/37717cc9430 [submodule "vendor/postgres-v17"] path = vendor/postgres-v17 url = ../postgres.git diff --git a/libs/walproposer/src/api_bindings.rs b/libs/walproposer/src/api_bindings.rs index c3be1e1dae..b6c1d7e48e 100644 --- a/libs/walproposer/src/api_bindings.rs +++ b/libs/walproposer/src/api_bindings.rs @@ -450,6 +450,9 @@ pub fn empty_shmem() -> crate::bindings::WalproposerShmemState { replica_promote: false, min_ps_feedback: empty_feedback, wal_rate_limiter: empty_wal_rate_limiter, + num_safekeepers: 0, + safekeeper_status: [0; 32], + safekeeper_commit_lsn: [0; 32], } } diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index caffdc9612..ff7ec05ba4 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -1312,7 +1312,6 @@ check_neon_id(char **newval, void **extra, GucSource source) return **newval == '\0' || HexDecodeString(id, *newval, 16); } - void PagestoreShmemInit(void) { diff --git a/pgxn/neon/neon.c b/pgxn/neon/neon.c index 76f3cf2e87..8af4399283 100644 --- a/pgxn/neon/neon.c +++ b/pgxn/neon/neon.c @@ -81,6 +81,8 @@ uint32 WAIT_EVENT_NEON_PS_READ; uint32 WAIT_EVENT_NEON_WAL_DL; #endif +int databricks_test_hook = 0; + enum RunningXactsOverflowPolicies { OP_IGNORE, OP_SKIP, @@ -445,6 +447,20 @@ ReportSearchPath(void) static int neon_pgstat_file_size_limit; #endif +#if PG_VERSION_NUM >= 160000 && PG_VERSION_NUM < 170000 +if (lakebase_mode) { + static void DatabricksSqlErrorHookImpl(int sqlerrcode) { + if (sqlerrcode == ERRCODE_DATA_CORRUPTED) { + pg_atomic_fetch_add_u32(&databricks_metrics_shared->data_corruption_count, 1); + } else if (sqlerrcode == ERRCODE_INDEX_CORRUPTED) { + pg_atomic_fetch_add_u32(&databricks_metrics_shared->index_corruption_count, 1); + } else if (sqlerrcode == ERRCODE_INTERNAL_ERROR) { + pg_atomic_fetch_add_u32(&databricks_metrics_shared->internal_error_count, 1); + } + } +} +#endif + void _PG_init(void) { @@ -456,6 +472,12 @@ _PG_init(void) load_file("$libdir/neon_rmgr", false); #endif +#if PG_VERSION_NUM >= 160000 && PG_VERSION_NUM < 170000 + if (lakebase_mode) { + SqlErrorCode_hook = DatabricksSqlErrorHookImpl; + } +#endif + /* * Initializing a pre-loaded Postgres extension happens in three stages: * @@ -594,6 +616,19 @@ _PG_init(void) 0, NULL, NULL, NULL); + // A test hook used in sql regress to trigger specific behaviors + // to test features easily. + DefineCustomIntVariable( + "databricks.test_hook", + "The test hook used in sql regress tests only", + NULL, + &databricks_test_hook, + 0, + 0, INT32_MAX, + PGC_SUSET, + 0, + NULL, NULL, NULL); + /* * Important: This must happen after other parts of the extension are * loaded, otherwise any settings to GUCs that were set before the @@ -743,6 +778,9 @@ neon_shmem_startup_hook(void) LfcShmemInit(); NeonPerfCountersShmemInit(); + if (lakebase_mode) { + DatabricksMetricsShmemInit(); + } PagestoreShmemInit(); RelsizeCacheShmemInit(); WalproposerShmemInit(); diff --git a/pgxn/neon/neon_perf_counters.c b/pgxn/neon/neon_perf_counters.c index dd576e4e73..54c7f19589 100644 --- a/pgxn/neon/neon_perf_counters.c +++ b/pgxn/neon/neon_perf_counters.c @@ -19,7 +19,36 @@ #include "neon.h" #include "neon_perf_counters.h" -#include "neon_pgversioncompat.h" +#include "walproposer.h" + +/* BEGIN_HADRON */ +databricks_metrics *databricks_metrics_shared; + +Size +DatabricksMetricsShmemSize(void) +{ + return sizeof(databricks_metrics); +} + +void +DatabricksMetricsShmemInit(void) +{ + bool found; + + databricks_metrics_shared = + ShmemInitStruct("Databricks counters", + DatabricksMetricsShmemSize(), + &found); + Assert(found == IsUnderPostmaster); + if (!found) + { + pg_atomic_init_u32(&databricks_metrics_shared->index_corruption_count, 0); + pg_atomic_init_u32(&databricks_metrics_shared->data_corruption_count, 0); + pg_atomic_init_u32(&databricks_metrics_shared->internal_error_count, 0); + pg_atomic_init_u32(&databricks_metrics_shared->ps_corruption_detected, 0); + } +} +/* END_HADRON */ neon_per_backend_counters *neon_per_backend_counters_shared; @@ -38,11 +67,12 @@ NeonPerfCountersShmemRequest(void) #else size = mul_size(NUM_NEON_PERF_COUNTER_SLOTS, sizeof(neon_per_backend_counters)); #endif + if (lakebase_mode) { + size = add_size(size, DatabricksMetricsShmemSize()); + } RequestAddinShmemSpace(size); } - - void NeonPerfCountersShmemInit(void) { @@ -361,6 +391,13 @@ neon_get_perf_counters(PG_FUNCTION_ARGS) neon_per_backend_counters totals = {0}; metric_t *metrics; + /* BEGIN_HADRON */ + WalproposerShmemState *wp_shmem; + uint32 num_safekeepers; + uint32 num_active_safekeepers; + XLogRecPtr max_active_safekeeper_commit_lag; + /* END_HADRON */ + /* We put all the tuples into a tuplestore in one go. */ InitMaterializedSRF(fcinfo, 0); @@ -395,6 +432,72 @@ neon_get_perf_counters(PG_FUNCTION_ARGS) metric_to_datums(&metrics[i], &values[0], &nulls[0]); tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls); } + + /* BEGIN_HADRON */ + if (databricks_test_hook == TestHookCorruption) { + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("test corruption"))); + } + + // Not ideal but piggyback our databricks counters into the neon perf counters view + // so that we don't need to introduce neon--1.x+1.sql to add a new view. + { + // Keeping this code in its own block to work around the C90 "don't mix declarations and code" rule when we define + // the `databricks_metrics` array in the next block. Yes, we are seriously dealing with C90 rules in 2025. + + // Read safekeeper status from wal proposer shared memory first. + // Note that we are taking a mutex when reading from walproposer shared memory so that the total safekeeper count is + // consistent with the active wal acceptors count. Assuming that we don't query this view too often the mutex should + // not be a huge deal. + XLogRecPtr min_commit_lsn = InvalidXLogRecPtr; + XLogRecPtr max_commit_lsn = InvalidXLogRecPtr; + XLogRecPtr lsn; + + wp_shmem = GetWalpropShmemState(); + SpinLockAcquire(&wp_shmem->mutex); + + num_safekeepers = wp_shmem->num_safekeepers; + num_active_safekeepers = 0; + for (int i = 0; i < num_safekeepers; i++) { + if (wp_shmem->safekeeper_status[i] == 1) { + num_active_safekeepers++; + // Only track the commit LSN lag among active safekeepers. + // If there are inactive safekeepers we will raise another alert so this lag value + // is less critical. + lsn = wp_shmem->safekeeper_commit_lsn[i]; + if (XLogRecPtrIsInvalid(min_commit_lsn) || lsn < min_commit_lsn) { + min_commit_lsn = lsn; + } + if (XLogRecPtrIsInvalid(max_commit_lsn) || lsn > max_commit_lsn) { + max_commit_lsn = lsn; + } + } + } + // Calculate max commit LSN lag across active safekeepers + max_active_safekeeper_commit_lag = (XLogRecPtrIsInvalid(min_commit_lsn) ? 0 : max_commit_lsn - min_commit_lsn); + + SpinLockRelease(&wp_shmem->mutex); + } + { + metric_t databricks_metrics[] = { + {"sql_index_corruption_count", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->index_corruption_count)}, + {"sql_data_corruption_count", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->data_corruption_count)}, + {"sql_internal_error_count", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->internal_error_count)}, + {"ps_corruption_detected", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->ps_corruption_detected)}, + {"num_active_safekeepers", false, 0.0, (double) num_active_safekeepers}, + {"num_configured_safekeepers", false, 0.0, (double) num_safekeepers}, + {"max_active_safekeeper_commit_lag", false, 0.0, (double) max_active_safekeeper_commit_lag}, + {NULL, false, 0, 0}, + }; + for (int i = 0; databricks_metrics[i].name != NULL; i++) + { + metric_to_datums(&databricks_metrics[i], &values[0], &nulls[0]); + tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls); + } + } + /* END_HADRON */ + pfree(metrics); return (Datum) 0; diff --git a/pgxn/neon/neon_perf_counters.h b/pgxn/neon/neon_perf_counters.h index 4b611b0636..6a6e16cd26 100644 --- a/pgxn/neon/neon_perf_counters.h +++ b/pgxn/neon/neon_perf_counters.h @@ -181,5 +181,24 @@ extern void inc_query_time(uint64 elapsed); extern Size NeonPerfCountersShmemSize(void); extern void NeonPerfCountersShmemInit(void); +/* BEGIN_HADRON */ +typedef struct +{ + pg_atomic_uint32 index_corruption_count; + pg_atomic_uint32 data_corruption_count; + pg_atomic_uint32 internal_error_count; + pg_atomic_uint32 ps_corruption_detected; +} databricks_metrics; + +extern databricks_metrics *databricks_metrics_shared; + +extern Size DatabricksMetricsShmemSize(void); +extern void DatabricksMetricsShmemInit(void); + +extern int databricks_test_hook; + +static const int TestHookCorruption = 1; +/* END_HADRON */ + #endif /* NEON_PERF_COUNTERS_H */ diff --git a/pgxn/neon/walproposer.h b/pgxn/neon/walproposer.h index 5507294c3b..a51133d897 100644 --- a/pgxn/neon/walproposer.h +++ b/pgxn/neon/walproposer.h @@ -430,6 +430,12 @@ typedef struct WalproposerShmemState /* BEGIN_HADRON */ /* The WAL rate limiter */ WalRateLimiter wal_rate_limiter; + /* Number of safekeepers in the config */ + uint32 num_safekeepers; + /* Per-safekeeper status flags: 0=inactive, 1=active */ + uint8 safekeeper_status[MAX_SAFEKEEPERS]; + /* Per-safekeeper commit LSN for metrics */ + XLogRecPtr safekeeper_commit_lsn[MAX_SAFEKEEPERS]; /* END_HADRON */ } WalproposerShmemState; diff --git a/vendor/postgres-v16 b/vendor/postgres-v16 index 9b9cb4b3e3..1be236e4d8 160000 --- a/vendor/postgres-v16 +++ b/vendor/postgres-v16 @@ -1 +1 @@ -Subproject commit 9b9cb4b3e33347aea8f61e606bb6569979516de5 +Subproject commit 1be236e4d85167244ca6962728c8e94fcb03edb9 diff --git a/vendor/revisions.json b/vendor/revisions.json index 7212c9f7c7..b271cda483 100644 --- a/vendor/revisions.json +++ b/vendor/revisions.json @@ -5,7 +5,7 @@ ], "v16": [ "16.9", - "9b9cb4b3e33347aea8f61e606bb6569979516de5" + "1be236e4d85167244ca6962728c8e94fcb03edb9" ], "v15": [ "15.13",