Report metrics on data/index corruption (#12729)

## Problem

We don't have visibility into data/index corruption.

## Summary of changes
Add data/index corruptions metrics.

PG calls elog ERROR errcode to emit these corruption errors.

PG Changes: https://github.com/neondatabase/postgres/pull/698
This commit is contained in:
Suhas Thalanki
2025-07-29 14:08:24 -04:00
committed by GitHub
parent 65d1be6e90
commit bf3a1529bf
4 changed files with 115 additions and 4 deletions

View File

@@ -1440,7 +1440,6 @@ check_neon_id(char **newval, void **extra, GucSource source)
return **newval == '\0' || HexDecodeString(id, *newval, 16);
}
void
PagestoreShmemInit(void)
{

View File

@@ -51,6 +51,7 @@ void _PG_init(void);
bool lakebase_mode = false;
static int running_xacts_overflow_policy;
static emit_log_hook_type prev_emit_log_hook;
static bool monitor_query_exec_time = false;
static ExecutorStart_hook_type prev_ExecutorStart = NULL;
@@ -81,6 +82,8 @@ uint32 WAIT_EVENT_NEON_PS_READ;
uint32 WAIT_EVENT_NEON_WAL_DL;
#endif
int databricks_test_hook = 0;
enum RunningXactsOverflowPolicies {
OP_IGNORE,
OP_SKIP,
@@ -445,6 +448,20 @@ ReportSearchPath(void)
static int neon_pgstat_file_size_limit;
#endif
static void DatabricksSqlErrorHookImpl(ErrorData *edata) {
if (prev_emit_log_hook != NULL) {
prev_emit_log_hook(edata);
}
if (edata->sqlerrcode == ERRCODE_DATA_CORRUPTED) {
pg_atomic_fetch_add_u32(&databricks_metrics_shared->data_corruption_count, 1);
} else if (edata->sqlerrcode == ERRCODE_INDEX_CORRUPTED) {
pg_atomic_fetch_add_u32(&databricks_metrics_shared->index_corruption_count, 1);
} else if (edata->sqlerrcode == ERRCODE_INTERNAL_ERROR) {
pg_atomic_fetch_add_u32(&databricks_metrics_shared->internal_error_count, 1);
}
}
void
_PG_init(void)
{
@@ -456,6 +473,11 @@ _PG_init(void)
load_file("$libdir/neon_rmgr", false);
#endif
if (lakebase_mode) {
prev_emit_log_hook = emit_log_hook;
emit_log_hook = DatabricksSqlErrorHookImpl;
}
/*
* Initializing a pre-loaded Postgres extension happens in three stages:
*
@@ -594,6 +616,19 @@ _PG_init(void)
0,
NULL, NULL, NULL);
// A test hook used in sql regress to trigger specific behaviors
// to test features easily.
DefineCustomIntVariable(
"databricks.test_hook",
"The test hook used in sql regress tests only",
NULL,
&databricks_test_hook,
0,
0, INT32_MAX,
PGC_SUSET,
0,
NULL, NULL, NULL);
/*
* Important: This must happen after other parts of the extension are
* loaded, otherwise any settings to GUCs that were set before the
@@ -816,6 +851,9 @@ neon_shmem_startup_hook(void)
LfcShmemInit();
NeonPerfCountersShmemInit();
if (lakebase_mode) {
DatabricksMetricsShmemInit();
}
PagestoreShmemInit();
RelsizeCacheShmemInit();
WalproposerShmemInit();

View File

@@ -19,7 +19,35 @@
#include "neon.h"
#include "neon_perf_counters.h"
#include "neon_pgversioncompat.h"
#include "walproposer.h"
/* BEGIN_HADRON */
databricks_metrics *databricks_metrics_shared;
Size
DatabricksMetricsShmemSize(void)
{
return sizeof(databricks_metrics);
}
void
DatabricksMetricsShmemInit(void)
{
bool found;
databricks_metrics_shared =
ShmemInitStruct("Databricks counters",
DatabricksMetricsShmemSize(),
&found);
Assert(found == IsUnderPostmaster);
if (!found)
{
pg_atomic_init_u32(&databricks_metrics_shared->index_corruption_count, 0);
pg_atomic_init_u32(&databricks_metrics_shared->data_corruption_count, 0);
pg_atomic_init_u32(&databricks_metrics_shared->internal_error_count, 0);
}
}
/* END_HADRON */
neon_per_backend_counters *neon_per_backend_counters_shared;
@@ -38,11 +66,12 @@ NeonPerfCountersShmemRequest(void)
#else
size = mul_size(NUM_NEON_PERF_COUNTER_SLOTS, sizeof(neon_per_backend_counters));
#endif
if (lakebase_mode) {
size = add_size(size, DatabricksMetricsShmemSize());
}
RequestAddinShmemSpace(size);
}
void
NeonPerfCountersShmemInit(void)
{
@@ -395,6 +424,33 @@ neon_get_perf_counters(PG_FUNCTION_ARGS)
metric_to_datums(&metrics[i], &values[0], &nulls[0]);
tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
}
if (lakebase_mode) {
if (databricks_test_hook == TestHookCorruption) {
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("test corruption")));
}
// Not ideal but piggyback our databricks counters into the neon perf counters view
// so that we don't need to introduce neon--1.x+1.sql to add a new view.
{
metric_t databricks_metrics[] = {
{"sql_index_corruption_count", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->index_corruption_count)},
{"sql_data_corruption_count", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->data_corruption_count)},
{"sql_internal_error_count", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->internal_error_count)},
{NULL, false, 0, 0},
};
for (int i = 0; databricks_metrics[i].name != NULL; i++)
{
metric_to_datums(&databricks_metrics[i], &values[0], &nulls[0]);
tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
}
}
/* END_HADRON */
}
pfree(metrics);
return (Datum) 0;

View File

@@ -177,5 +177,23 @@ extern void inc_query_time(uint64 elapsed);
extern Size NeonPerfCountersShmemSize(void);
extern void NeonPerfCountersShmemInit(void);
/* BEGIN_HADRON */
typedef struct
{
pg_atomic_uint32 index_corruption_count;
pg_atomic_uint32 data_corruption_count;
pg_atomic_uint32 internal_error_count;
} databricks_metrics;
extern databricks_metrics *databricks_metrics_shared;
extern Size DatabricksMetricsShmemSize(void);
extern void DatabricksMetricsShmemInit(void);
extern int databricks_test_hook;
static const int TestHookCorruption = 1;
/* END_HADRON */
#endif /* NEON_PERF_COUNTERS_H */