[BRC-3082] Monitor commit LSN lag among active SKs (#1002)

Commit
e69c3d632b
added metrics (used for alerting) to indicate whether Safekeepers are
operating with a degraded quorum due to some of them being down.
However, even if all SKs are active/reachable, we probably still want to
raise an alert if some of them are really slow or otherwise lagging
behind, as it is technically still a "degraded quorum" situation.

Added a new field `max_active_safekeeper_commit_lag` to the
`neon_perf_counters` view that reports the lag between the most advanced
and most lagging commit LSNs among active Safekeepers.

Commit LSNs are received from `AppendResponse` messages from SKs and
recorded in the `WalProposer`'s shared memory state.

Note that this lag is calculated among active SKs only to keep this
alert clean. If there are inactive SKs the previous metric on active SKs
will capture that instead.

Note: @chen-luo_data pointed out during the PR review that we can
probably get the benefits of this metric with PromQL query like `max
(safekeeper_commit_lsn) by (tenant_id, timeline_id) -
min(safekeeper_commit_lsn) by (tenant_id, timeline_id)` on existing
metrics exported by SKs.

Given that this code is already ready, @haoyu-huang_data suggested that
I just check in this change anyway, as the reliability of prometheus
metrics (and especially the aggregation operators when the result set
cardinality is high) is somewhat questionable based on our prior
experience.

Added integration test
`test_wal_acceptor.py::test_max_active_safekeeper_commit_lag`.
This commit is contained in:
William Huang
2025-05-13 22:42:31 -07:00
committed by Suhas Thalanki
parent 4897921de6
commit fae734b15c
9 changed files with 5553 additions and 15 deletions

499
kind/endpoint_tests.py Normal file
View File

@@ -0,0 +1,499 @@
import json
import logging
import re
import time
import uuid
from pathlib import Path
from typing import Any
from urllib.parse import urlparse
import backoff
import jwt
import psycopg2
import requests
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from kind_test_env import (
HADRON_COMPUTE_IMAGE_NAME,
HADRON_MT_IMAGE_NAME,
HadronEndpoint,
KindTestEnvironment,
NodeType,
check_prerequisites,
read_table_data,
unique_node_id,
write_table_data,
)
logging.getLogger("backoff").addHandler(logging.StreamHandler())
def read_private_key_and_public_key(
privkey_filename: str, certificate_path: str
) -> tuple[str, str]:
# Get certificate serial number
with open(certificate_path, "rb") as pem_file:
content = pem_file.read()
certificate = x509.load_pem_x509_certificate(content, default_backend())
certificate_thumbprint = certificate.fingerprint(hashes.SHA256()).hex()
# Get private key content string.
private_key_content = Path(privkey_filename).read_text()
return (private_key_content, certificate_thumbprint)
def generate_token(
testuser: str,
endpoint_id: str,
database: str,
cert_directory: str,
payload: dict[str, Any] | None = None,
) -> str:
"""
Generate a JWT token for a testuser using the private key specified in the environment.
:param testuser: user name to generate the token for.
:param endpoint_id: hadron endpoint id.
:param database: database to connect to.
:param cert_directory: directory containing the private key and public key for generating the token.
:param payload: additional payload. It will be merged with the default payload.
:return:
"""
(private_key, certificate_thumbprint) = read_private_key_and_public_key(
f"{cert_directory}/privkey1.pem", f"{cert_directory}/pubkey1.pem"
)
expiration = time.time() + (10 * 60) # Expiration time is 10 minutes.
payload = {
"sub": testuser,
"iss": "brickstore.databricks.com",
"exp": int(expiration),
"endpointId": endpoint_id,
"database": database,
} | (payload or {})
token = jwt.encode(
payload, private_key, algorithm="RS256", headers={"kid": certificate_thumbprint}
)
return token
def check_token_login(
endpoint: HadronEndpoint, endpoint_id: str, database: str, cert_directory: str
) -> None:
"""
Check that we can login to the endpoint using a JWT token.
"""
# Create test_token_user
testuser = "test_token_user"
with endpoint.cursor() as cur:
cur.execute(f"CREATE ROLE {testuser} LOGIN PASSWORD NULL;")
# Login with the token
token = generate_token(testuser, endpoint_id, database, cert_directory)
with endpoint.cursor(user=testuser, password=token) as c:
c.execute("select current_user")
result = c.fetchone()
assert result == (testuser,)
def check_databricks_roles(endpoint: HadronEndpoint) -> None:
"""
Check that the expected Databricks roles are present in the endpoint.
"""
expected_roles = ["databricks_monitor", "databricks_control_plane", "databricks_gateway"]
with endpoint.cursor() as cur:
for role in expected_roles:
cur.execute(f"SELECT 1 FROM pg_roles WHERE rolname = '{role}';")
result = cur.fetchone()
assert result == (1,)
def test_create_endpoint_and_connect() -> None:
"""
Tests that we can create an endpoint on a Hadron deployment and connect to it/run simple queries.
"""
with KindTestEnvironment() as env:
env.load_image(HADRON_MT_IMAGE_NAME)
env.load_image(HADRON_COMPUTE_IMAGE_NAME)
# Setup the Hadron deployment in the brand new KIND cluster.
# We have 2 PS, 3 SK, and mocked S3 storage in the test.
env.start_hcc_with_configmaps(
"configmaps/configmap_2ps_3sk.yaml", "configmaps/safe_configmap_1.yaml"
)
ps_id_0 = unique_node_id(0, 0)
ps_id_1 = unique_node_id(0, 1)
# Wait for all the Hadron storage components to come up.
env.wait_for_app_ready("storage-controller", namespace="hadron")
env.wait_for_statefulset_replicas("safe-keeper-0", replicas=3, namespace="hadron")
env.wait_for_statefulset_replicas("page-server-0", replicas=2, namespace="hadron")
env.wait_for_node_registration(NodeType.PAGE_SERVER, {ps_id_0, ps_id_1})
env.wait_for_node_registration(
NodeType.SAFE_KEEPER, {unique_node_id(0, 0), unique_node_id(0, 1), unique_node_id(0, 2)}
)
@backoff.on_exception(backoff.expo, psycopg2.Error, max_tries=10)
def check_superuser_and_basic_data_operation(endpoint):
with endpoint.cursor() as cur:
# Check whether the current user is the super user.
cur.execute("SELECT usesuper FROM pg_user WHERE usename = CURRENT_USER")
is_superuser = cur.fetchone()[0]
# Create a simple table and insert some data.
cur.execute("DROP TABLE IF EXISTS t")
cur.execute("CREATE TABLE t (x int)")
cur.execute("INSERT INTO t VALUES (1), (2), (3)")
cur.execute("SELECT * FROM t")
rows = cur.fetchall()
# Test that the HCC-created default user was indeed the super user
# and that we can retrieve the same data we inserted from the table.
assert is_superuser is True, "Current user is not a superuser"
assert rows == [(1,), (2,), (3,)], f"Data retrieval mismatch: {rows}"
# Verify that the server has ssl turn on .
cur.execute("SHOW SSL;")
result = cur.fetchone()
assert result == ("on",)
# Verify that the connection is using SSL.
cur.execute("SELECT SSL FROM pg_stat_ssl WHERE pid = pg_backend_pid();")
result = cur.fetchone()
assert result == (True,)
@backoff.on_exception(backoff.expo, psycopg2.Error, max_tries=10)
def check_databricks_system_tables(endpoint):
with endpoint.cursor(dbname="databricks_system") as cur:
# Verify that the LFC is working by querying the LFC stats table.
# If the LFC is not running, the table will contain a single row with all NULL values.
cur.execute(
"SELECT 1 FROM neon.NEON_STAT_FILE_CACHE WHERE file_cache_misses IS NOT NULL;"
)
lfcStatsRows = cur.fetchall()
assert len(lfcStatsRows) == 1, "LFC stats table is empty"
# Check that the system-level GUCs are set to the expected values. These should be set before the endpoint
# starts accepting connections.
def check_guc_values(endpoint):
with endpoint.cursor() as cur:
cur.execute("SHOW databricks.workspace_url;")
res = cur.fetchone()[0]
print(f"atabricks.workspace_url: {res}")
assert (
res == urlparse(test_workspace_url).hostname
), "Failed to get the correct databricks.workspace_url GUC value"
cur.execute("SHOW databricks.enable_databricks_identity_login;")
res = cur.fetchone()[0]
print(f"databricks.enable_databricks_identity_login: {res}")
assert (
res == "on"
), "Failed to get the correct databricks.enable_databricks_identity_login GUC value"
cur.execute("SHOW databricks.enable_sql_restrictions;")
res = cur.fetchone()[0]
print(f"databricks.enable_sql_restrictions: {res}")
assert (
res == "on"
), "Failed to get the correct databricks.enable_sql_restrictions GUC value"
cur.execute("SHOW databricks.disable_PAT_login;")
res = cur.fetchone()[0]
print(f"databricks.disable_PAT_login: {res}")
assert (
res == "on"
), "Failed to get the correct databricks.disable_PAT_login GUC value"
def check_cert_auth_user(endpoint):
expected_user = "databricks_control_plane"
with endpoint.cursor(
user=expected_user,
sslcert=f"{env.tempDir.name}/pubkey1.pem",
sslkey=f"{env.tempDir.name}/privkey1.pem",
sslmode="require",
) as cur:
cur.execute("select current_user;")
current_user = cur.fetchone()[0]
assert current_user == expected_user, f"{current_user} is not {expected_user}"
# Query the "neon.pageserver_connstring" Postgres GUC to see which pageserver the compute node is currently connected to.
def check_current_ps_id(endpoint: HadronEndpoint) -> int:
with endpoint.cursor() as cur:
cur.execute("SHOW neon.pageserver_connstring;")
res = cur.fetchone()
assert res is not None, "Failed to get the current pageserver connection URL"
connection_url = res[0]
print(f"Current pageserver connection URL is {connection_url}")
host = urlparse(connection_url).hostname
# In this test, the hostname is in the form of "page-server-{pool}-{ordinal}.page-server.hadron.svc.cluster.local"
# We extract the "page-server-{pool}-{ordinal}" part and convert the two numbers into the pageserver ID.
pool_id, ordinal_id = host.split(".")[0].split("-")[-2:]
return unique_node_id(int(pool_id), int(ordinal_id))
def verify_compute_pod_metadata(pod_name: str, pod_namespace: str, endpoint_id: uuid.UUID):
# Check that dblet-required labels and annotations are present on the compute pod.
# See go/dblet-labels
compute_pod = env.kubectl_get_pod(namespace=pod_namespace, pod_name=pod_name)
assert (
compute_pod["metadata"]["annotations"]["databricks.com/workspace-url"]
== urlparse(test_workspace_url).hostname
)
assert compute_pod["metadata"]["labels"]["orgId"] == test_workspace_id
assert compute_pod["metadata"]["labels"]["dblet.dev/appid"] == f"{endpoint_id}-0"
def check_pg_log_redaction(pod_name: str, container_name: str, pod_namespace: str):
"""
Various checks to ensure that the PG log redactor is working as expected via comparing
PG log vs. redacted log.
Checks between original from PG and redacted log:
- log folders exist
- there's at least 1 log file
- the number of files match
- number of log entries is close
- the last redacted log entry is in the last few PG log entries, ignoring the
redacted message field
"""
# a little higher than redactor flush entries
lag_tolerance_items = 22
MESSAGE_FIELD = "message"
LOG_DAEMON_EXPECTED_REGEX = r"hadron-compute-redacted-[a-zA-Z]{3}[0-9]{4}\.json"
redactor_file_count_catchup_timeout_seconds = 60
def kex(command: list[str]) -> str:
# mypy can't tell kubectl_exec returns str
result: str = env.kubectl_exec(pod_namespace, pod_name, container_name, command)
return result
log_folders = kex(["ls", "/databricks/logs/"]).split()
assert "brickstore" in log_folders, "PG log folder not found"
assert "brickstore-redacted" in log_folders, "Redacted log folder not found"
@backoff.on_exception(
backoff.expo, AssertionError, max_time=redactor_file_count_catchup_timeout_seconds
)
def get_caught_up_files() -> tuple[list[str], list[str]]:
"""
Get pg (input) and redacted (output) log files after verifying the files exist and the counts match.
@return: tuple of:
- list of pg log file names
- list of redacted log file names
"""
pg_log_files = kex(["ls", "-t", "/databricks/logs/brickstore/"]).split()
redacted_log_files = kex(
["ls", "-t", "/databricks/logs/brickstore-redacted/"]
).split()
pg_log_files = [file for file in pg_log_files if ".json" in file]
print("Compute log files:", pg_log_files, redacted_log_files)
assert len(pg_log_files) > 0, "PG didn't produce any JSON log files"
assert len(redacted_log_files) > 0, "Redactor didn't produce any log files"
assert len(pg_log_files) == len(
redacted_log_files
), "Redactor didn't process each log file exactly once"
for file in redacted_log_files:
assert re.match(
LOG_DAEMON_EXPECTED_REGEX, file
), f"Unexpected redacted log file name: {file}"
return pg_log_files, redacted_log_files
# wait for pg_log_redactor to catch up, by file count
pg_log_files, redacted_log_files = get_caught_up_files()
# Rest will examine latest files closer
last_pg_log_file = pg_log_files[0]
last_redacted_log_file = redacted_log_files[0]
pg_log_entries_num = int(
kex(["wc", "-l", f"/databricks/logs/brickstore/{last_pg_log_file}"]).split()[0]
)
redacted_log_entries_num = int(
kex(
["wc", "-l", f"/databricks/logs/brickstore-redacted/{last_redacted_log_file}"]
).split()[0]
)
assert (
redacted_log_entries_num <= pg_log_entries_num
), "Redactor emitted non-PG log messages, either through bug or own error msg."
assert (
redacted_log_entries_num - pg_log_entries_num < lag_tolerance_items
), "Redactor lagged behind, more than OS buffering should allow for."
# Order to decrease chance of lag flakiness
pg_log_tail = kex(
[
"tail",
"-n",
str(lag_tolerance_items),
f"/databricks/logs/brickstore/{last_pg_log_file}",
]
)
redacted_log_tail_item = kex(
[
"tail",
"-n",
"1",
f"/databricks/logs/brickstore-redacted/{last_redacted_log_file}",
]
)
redacted_log_tail_json = json.loads(redacted_log_tail_item)
if MESSAGE_FIELD in redacted_log_tail_json:
del redacted_log_tail_json[MESSAGE_FIELD]
found_in_pg_log = False
for pg_log_item in pg_log_tail.split("\n"):
pg_log_json = json.loads(pg_log_item)
if MESSAGE_FIELD in pg_log_json:
del pg_log_json[MESSAGE_FIELD]
if redacted_log_tail_json == pg_log_json:
found_in_pg_log = True
break
# Note: lag is possible because tail call is not synced w/ lag check and there's no simple way to
assert found_in_pg_log, "Last log seen in redactor is not a recent log from PG, through lag bug or own error msg"
# Create an endpoint with random IDs.
test_metastore_id = uuid.uuid4()
test_endpoint_id = uuid.uuid4()
test_workspace_id = "987654321"
test_workspace_url = "https://test-workspace-url/"
compute_namespace = ""
compute_name = ""
with env.hcc_create_endpoint(
test_metastore_id, test_endpoint_id, test_workspace_id, test_workspace_url
) as endpoint:
check_superuser_and_basic_data_operation(endpoint)
check_databricks_system_tables(endpoint)
check_databricks_roles(endpoint)
check_guc_values(endpoint)
check_cert_auth_user(endpoint)
check_token_login(endpoint, str(test_endpoint_id), "postgres", env.tempDir.name)
write_table_data(endpoint, "my_table", ["a", "b", "c"])
assert read_table_data(endpoint, "my_table") == ["a", "b", "c"]
compute_name = endpoint.name
compute_namespace = endpoint.namespace
hadron_compute_pods = env.kubectl_pods(namespace=compute_namespace)
hadron_compute_pod_name = hadron_compute_pods[0]
verify_compute_pod_metadata(
hadron_compute_pod_name, compute_namespace, test_endpoint_id
)
# Check in compute log that we have initialized the Databricks extension.
logs = env.kubectl_logs(namespace=compute_namespace, pod_name=hadron_compute_pod_name)
assert "Databricks extension initialized" in logs, "Endpoint creation not logged"
# Check that metrics are exported
r = requests.get(endpoint.metrics_url)
assert r.status_code == 200
assert "pg_static" in r.text
# Check for this particular metric to make sure prometheus exporter has the permission to
# execute wal-related functions such as `pg_current_wal_lsn`, `pg_wal_lsn_diff`, etc.
assert "pg_replication_slots_pg_wal_lsn_diff" in r.text
# Check for these metrics from function or view in neon schema in databricks_system database
# to ensure extra grants were successful
assert "pg_backpressure_throttling_time" in r.text
assert "pg_lfc_hits" in r.text
assert "pg_lfc_working_set_size" in r.text
assert "pg_cluster_size_bytes" in r.text
assert "pg_snapshot_files_count" in r.text
assert re.search(r"pg_writable_bool{.*} 1", r.text)
assert "pg_database_size_bytes" not in r.text
# Check for this label key to ensure that the metrics are being labeled correctly for PuPr model
assert "pg_instance_id=" in r.text
assert "pg_metrics_sql_index_corruption_count" in r.text
assert "pg_metrics_num_active_safekeepers" in r.text
assert "pg_metrics_num_configured_safekeepers" in r.text
assert "pg_metrics_max_active_safekeeper_commit_lag" in r.text
check_pg_log_redaction(hadron_compute_pod_name, endpoint.container, compute_namespace)
# Smoke test tenant migration
curr_ps_id = check_current_ps_id(endpoint)
new_ps_id = ps_id_0 if curr_ps_id == ps_id_1 else ps_id_1
env.hcc_migrate_endpoint(test_endpoint_id, new_ps_id)
assert check_current_ps_id(endpoint) == new_ps_id
# Check that data operation still works after migration
check_superuser_and_basic_data_operation(endpoint)
# Check that the data we wrote before migration stays untouched
assert read_table_data(endpoint, "my_table") == ["a", "b", "c"]
# Restart the compute endpoint to clear any local caches to be extra sure that tenant migration indeed
# does not lose data
with env.restart_endpoint(compute_name, compute_namespace) as endpoint:
# Check that data persists after the compute node restarts.
assert read_table_data(endpoint, "my_table") == ["a", "b", "c"]
# Check that data operations can resume after the restart
check_superuser_and_basic_data_operation(endpoint)
# PG compute reconciliation verification test. We intetionally run this test after the tenant migration
# restart test so that the tenant migration test can observe the first, "untainted" restart.
#
# Update the cluster-config map's default PgParams and ensure that the compute instance is reconciled properly.
# In this case, test updating the compute_http_port as a trivial example.
current_configmap = env.get_configmap_json("cluster-config", "hadron")
config_map_key = "config.json"
config_json = json.loads(current_configmap["data"][config_map_key])
if "pg_params" not in config_json:
config_json["pg_params"] = {}
test_http_port = 3456
config_json["pg_params"]["compute_http_port"] = test_http_port
patch_json = {"data": {"config.json": json.dumps(config_json)}}
env.kubectl_patch(
resource="configmap",
name="cluster-config",
namespace="hadron",
json_patch=json.dumps(patch_json),
)
# Ensure that the deployment is updated by the HCC accordingly within 2 mintues.
# Note that waiting this long makes sense since files mounted via a config maps
# are not updated immediately, and instead updated every kubelet sync period (typically 1m)
# on top of the kubelets configmap cahce TTL (which is also typically 1m).
timeout = 120
start_time = time.time()
while True:
deployment = env.get_deployment_json(compute_name, compute_namespace)
port = deployment["spec"]["template"]["spec"]["containers"][0]["ports"][1]
if port["containerPort"] == test_http_port:
print("Compute succesfully updated")
break
else:
print(
f"Current PG HTTP port spec: {port}, Expected container port: {test_http_port}"
)
if time.time() - start_time >= timeout:
raise Exception(f"Compute deployment did not update within {timeout} seconds")
time.sleep(5)
# Wait for the updated endpoint to become ready again after a rollout.
env.wait_for_app_ready(compute_name, namespace="hadron-compute", timeout=60)
# Verify that the updated compute pod has the correct workspace annotations/labels
# as persisted in the metadata database.
hadron_compute_pods = env.kubectl_pods(namespace=compute_namespace)
hadron_compute_pod_name = hadron_compute_pods[0]
verify_compute_pod_metadata(
hadron_compute_pod_name, compute_namespace, test_endpoint_id
)
# Delete the endpoint.
env.hcc_delete_endpoint(test_endpoint_id)
# Ensure that k8s resources of the compute endpoint are deleted.
env.wait_for_compute_resource_deletion(compute_name, compute_namespace)
if __name__ == "__main__":
check_prerequisites()
test_create_endpoint_and_connect()

View File

@@ -479,6 +479,7 @@ pub fn empty_shmem() -> crate::bindings::WalproposerShmemState {
wal_rate_limiter: empty_wal_rate_limiter,
num_safekeepers: 0,
safekeeper_status: [0; 32],
safekeeper_commit_lsn: [0; 32],
}
}

View File

@@ -396,6 +396,7 @@ neon_get_perf_counters(PG_FUNCTION_ARGS)
WalproposerShmemState *wp_shmem;
uint32 num_safekeepers;
uint32 num_active_safekeepers;
XLogRecPtr max_active_safekeeper_commit_lag;
/* END_HADRON */
/* We put all the tuples into a tuplestore in one go. */
@@ -451,35 +452,53 @@ neon_get_perf_counters(PG_FUNCTION_ARGS)
// Note that we are taking a mutex when reading from walproposer shared memory so that the total safekeeper count is
// consistent with the active wal acceptors count. Assuming that we don't query this view too often the mutex should
// not be a huge deal.
XLogRecPtr min_commit_lsn = InvalidXLogRecPtr;
XLogRecPtr max_commit_lsn = InvalidXLogRecPtr;
XLogRecPtr lsn;
wp_shmem = GetWalpropShmemState();
SpinLockAcquire(&wp_shmem->mutex);
num_safekeepers = wp_shmem->num_safekeepers;
num_active_safekeepers = 0;
for (int i = 0; i < num_safekeepers; i++) {
if (wp_shmem->safekeeper_status[i] == 1) {
num_active_safekeepers++;
// Only track the commit LSN lag among active safekeepers.
// If there are inactive safekeepers we will raise another alert so this lag value
// is less critical.
lsn = wp_shmem->safekeeper_commit_lsn[i];
if (XLogRecPtrIsInvalid(min_commit_lsn) || lsn < min_commit_lsn) {
min_commit_lsn = lsn;
}
if (XLogRecPtrIsInvalid(max_commit_lsn) || lsn > max_commit_lsn) {
max_commit_lsn = lsn;
}
}
}
// Calculate max commit LSN lag across active safekeepers
max_active_safekeeper_commit_lag = (XLogRecPtrIsInvalid(min_commit_lsn) ? 0 : max_commit_lsn - min_commit_lsn);
SpinLockRelease(&wp_shmem->mutex);
}
{
metric_t databricks_metrics[] = {
{"sql_index_corruption_count", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->index_corruption_count)},
{"sql_data_corruption_count", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->data_corruption_count)},
{"sql_internal_error_count", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->internal_error_count)},
{"ps_corruption_detected", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->ps_corruption_detected)},
{"num_active_safekeepers", false, 0.0, (double) num_active_safekeepers},
{"num_configured_safekeepers", false, 0.0, (double) num_safekeepers},
{NULL, false, 0, 0},
};
for (int i = 0; databricks_metrics[i].name != NULL; i++)
{
metric_to_datums(&databricks_metrics[i], &values[0], &nulls[0]);
tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
}
metric_t databricks_metrics[] = {
{"sql_index_corruption_count", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->index_corruption_count)},
{"sql_data_corruption_count", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->data_corruption_count)},
{"sql_internal_error_count", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->internal_error_count)},
{"ps_corruption_detected", false, 0, (double) pg_atomic_read_u32(&databricks_metrics_shared->ps_corruption_detected)},
{"num_active_safekeepers", false, 0.0, (double) num_active_safekeepers},
{"num_configured_safekeepers", false, 0.0, (double) num_safekeepers},
{"max_active_safekeeper_commit_lag", false, 0.0, (double) max_active_safekeeper_commit_lag},
{NULL, false, 0, 0},
};
for (int i = 0; databricks_metrics[i].name != NULL; i++)
{
metric_to_datums(&databricks_metrics[i], &values[0], &nulls[0]);
tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
}
/* END_HADRON */
}
/* END_HADRON */
pfree(metrics);

View File

@@ -436,6 +436,8 @@ typedef struct WalproposerShmemState
uint32 num_safekeepers;
/* Per-safekeeper status flags: 0=inactive, 1=active */
uint8 safekeeper_status[MAX_SAFEKEEPERS];
/* Per-safekeeper commit LSN for metrics */
XLogRecPtr safekeeper_commit_lsn[MAX_SAFEKEEPERS];
/* END_HADRON */
} WalproposerShmemState;

View File

@@ -2106,6 +2106,16 @@ walprop_pg_process_safekeeper_feedback(WalProposer *wp, Safekeeper *sk)
if (wp->config->syncSafekeepers)
return;
/* BEGIN_HADRON */
// Record safekeeper commit LSN in shared memory for lag monitoring
{
WalproposerShmemState *shmem = wp->api.get_shmem_state(wp);
Assert(sk->index < MAX_SAFEKEEPERS);
SpinLockAcquire(&shmem->mutex);
shmem->safekeeper_commit_lsn[sk->index] = sk->appendResponse.commitLsn;
SpinLockRelease(&shmem->mutex);
}
/* END_HADRON */
/* handle fresh ps_feedback */
if (sk->appendResponse.ps_feedback.present)
@@ -2243,6 +2253,7 @@ walprop_pg_reset_safekeeper_statuses_for_metrics(WalProposer *wp, uint32 num_saf
SpinLockAcquire(&shmem->mutex);
shmem->num_safekeepers = num_safekeepers;
memset(shmem->safekeeper_status, 0, sizeof(shmem->safekeeper_status));
memset(shmem->safekeeper_commit_lsn, 0, sizeof(shmem->safekeeper_commit_lsn));
SpinLockRelease(&shmem->mutex);
}

View File

@@ -24,6 +24,7 @@ use tracing::*;
use utils::id::TenantTimelineId;
use utils::lsn::Lsn;
use utils::pageserver_feedback::PageserverFeedback;
use utils::pausable_failpoint;
use crate::GlobalTimelines;
use crate::handler::SafekeeperPostgresHandler;
@@ -598,6 +599,8 @@ impl WalAcceptor {
// Note that a flush can still happen on segment bounds, which will result
// in an AppendResponse.
if let ProposerAcceptorMessage::AppendRequest(append_request) = msg {
// allow tests to pause AppendRequest processing to simulate lag
pausable_failpoint!("sk-acceptor-pausable");
msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request);
dirty = true;
}

View File

@@ -547,6 +547,7 @@ pgxn/neon/neon_perf_counters.c:neon_get_perf_counters(PG_FUNCTION_ARGS)
pgxn/neon/neon_perf_counters.c: neon_per_backend_counters totals = {0};
pgxn/neon/neon_perf_counters.c: uint32 num_safekeepers;
pgxn/neon/neon_perf_counters.c: uint32 num_active_safekeepers;
pgxn/neon/neon_perf_counters.c: XLogRecPtr max_active_safekeeper_commit_lag;
pgxn/neon/neon_perf_counters.c: for (int procno = 0; procno < NUM_NEON_PERF_COUNTER_SLOTS; procno++)
pgxn/neon/neon_perf_counters.c: neon_per_backend_counters *counters = &neon_per_backend_counters_shared[procno];
pgxn/neon/neon_perf_counters.c: metrics = neon_perf_counters_to_metrics(&totals);
@@ -559,8 +560,14 @@ pgxn/neon/neon_perf_counters.c: num_active_safekeepers = 0;
pgxn/neon/neon_perf_counters.c: for (int i = 0; i < num_safekeepers; i++) {
pgxn/neon/neon_perf_counters.c: if (wp_shmem->safekeeper_status[i] == 1) {
pgxn/neon/neon_perf_counters.c: num_active_safekeepers++;
pgxn/neon/neon_perf_counters.c: // Only track the commit LSN lag among active safekeepers.
pgxn/neon/neon_perf_counters.c: // If there are inactive safekeepers we will raise another alert so this lag value
pgxn/neon/neon_perf_counters.c: lsn = wp_shmem->safekeeper_commit_lsn[i];
pgxn/neon/neon_perf_counters.c: // Calculate max commit LSN lag across active safekeepers
pgxn/neon/neon_perf_counters.c: max_active_safekeeper_commit_lag = (XLogRecPtrIsInvalid(min_commit_lsn) ? 0 : max_commit_lsn - min_commit_lsn);
pgxn/neon/neon_perf_counters.c: {"num_active_safekeepers", false, 0.0, (double) num_active_safekeepers},
pgxn/neon/neon_perf_counters.c: {"num_configured_safekeepers", false, 0.0, (double) num_safekeepers},
pgxn/neon/neon_perf_counters.c: {"max_active_safekeeper_commit_lag", false, 0.0, (double) max_active_safekeeper_commit_lag},
pgxn/neon/neon_perf_counters.h: * neon_perf_counters.h
pgxn/neon/neon_perf_counters.h: * Performance counters for neon storage requests
pgxn/neon/neon_perf_counters.h:#ifndef NEON_PERF_COUNTERS_H
@@ -1487,6 +1494,8 @@ pgxn/neon/walproposer.h: /* Number of safekeepers in the config */
pgxn/neon/walproposer.h: uint32 num_safekeepers;
pgxn/neon/walproposer.h: /* Per-safekeeper status flags: 0=inactive, 1=active */
pgxn/neon/walproposer.h: uint8 safekeeper_status[MAX_SAFEKEEPERS];
pgxn/neon/walproposer.h: /* Per-safekeeper commit LSN for metrics */
pgxn/neon/walproposer.h: XLogRecPtr safekeeper_commit_lsn[MAX_SAFEKEEPERS];
pgxn/neon/walproposer.h: * Report safekeeper state to proposer
pgxn/neon/walproposer.h: * Current term of the safekeeper; if it is higher than proposer's, the
pgxn/neon/walproposer.h: /* Safekeeper reports back his awareness about which WAL is committed, as */
@@ -1718,6 +1727,9 @@ pgxn/neon/walproposer_pg.c: * Based on commitLsn and safekeeper responses includ
pgxn/neon/walproposer_pg.c: * None of that is functional in sync-safekeepers.
pgxn/neon/walproposer_pg.c:walprop_pg_process_safekeeper_feedback(WalProposer *wp, Safekeeper *sk)
pgxn/neon/walproposer_pg.c: if (wp->config->syncSafekeepers)
pgxn/neon/walproposer_pg.c: // Record safekeeper commit LSN in shared memory for lag monitoring
pgxn/neon/walproposer_pg.c: Assert(sk->index < MAX_SAFEKEEPERS);
pgxn/neon/walproposer_pg.c: shmem->safekeeper_commit_lsn[sk->index] = sk->appendResponse.commitLsn;
pgxn/neon/walproposer_pg.c: SetNeonCurrentClusterSize(sk->appendResponse.ps_feedback.currentClusterSize);
pgxn/neon/walproposer_pg.c: * hardened and will be fetched from one of safekeepers by
pgxn/neon/walproposer_pg.c: * neon_walreader if needed.
@@ -1729,6 +1741,7 @@ pgxn/neon/walproposer_pg.c:uint64 GetNeonCurrentClusterSize(void);
pgxn/neon/walproposer_pg.c:walprop_pg_reset_safekeeper_statuses_for_metrics(WalProposer *wp, uint32 num_safekeepers)
pgxn/neon/walproposer_pg.c: shmem->num_safekeepers = num_safekeepers;
pgxn/neon/walproposer_pg.c: memset(shmem->safekeeper_status, 0, sizeof(shmem->safekeeper_status));
pgxn/neon/walproposer_pg.c: memset(shmem->safekeeper_commit_lsn, 0, sizeof(shmem->safekeeper_commit_lsn));
pgxn/neon/walproposer_pg.c:walprop_pg_update_safekeeper_status_for_metrics(WalProposer *wp, uint32 sk_index, uint8 status)
pgxn/neon/walproposer_pg.c: Assert(sk_index < MAX_SAFEKEEPERS);
pgxn/neon/walproposer_pg.c: shmem->safekeeper_status[sk_index] = status;

File diff suppressed because it is too large Load Diff

View File

@@ -2776,6 +2776,11 @@ def test_timeline_disk_usage_limit(neon_env_builder: NeonEnvBuilder):
"SELECT value FROM neon_perf_counters WHERE metric = 'num_configured_safekeepers'"
)
assert cur.fetchone() == (1,), "Expected 1 configured safekeeper"
# Check that max_active_safekeeper_commit_lag metric exists and is zero with single safekeeper
cur.execute(
"SELECT value FROM neon_perf_counters WHERE metric = 'max_active_safekeeper_commit_lag'"
)
assert cur.fetchone() == (0,), "Expected zero commit lag with one safekeeper"
# Get the safekeeper
sk = env.safekeepers[0]
@@ -2819,6 +2824,11 @@ def test_timeline_disk_usage_limit(neon_env_builder: NeonEnvBuilder):
"SELECT value FROM neon_perf_counters WHERE metric = 'num_configured_safekeepers'"
)
assert cur.fetchone() == (1,), "Expected 1 configured safekeeper"
# Check that max_active_safekeeper_commit_lag metric exists and is zero with no active safekeepers
cur.execute(
"SELECT value FROM neon_perf_counters WHERE metric = 'max_active_safekeeper_commit_lag'"
)
assert cur.fetchone() == (0,), "Expected zero commit lag with no active safekeepers"
# Sanity check that the hanging insert is indeed still hanging. Otherwise means the circuit breaker we
# implemented didn't work as expected.
@@ -2933,3 +2943,77 @@ def test_global_disk_usage_limit(neon_env_builder: NeonEnvBuilder):
with conn.cursor() as cur:
cur.execute("select count(*) from t2")
assert cur.fetchone() == (3000,)
@pytest.mark.skip(reason="Lakebase Mode")
def test_max_active_safekeeper_commit_lag(neon_env_builder: NeonEnvBuilder):
"""
This test validates the `max_active_safekeeper_commit_lag` metric. The
strategy is to intentionally create a scenario where one safekeeper falls
behind (by pausing it with a failpoint), observe that the metric correctly
reports this lag, and then confirm that the metric returns to zero after the
lagging safekeeper catches up (once the failpoint is removed).
"""
neon_env_builder.num_safekeepers = 2
env = neon_env_builder.init_start()
# Create branch and start endpoint
env.create_branch("test_commit_lsn_lag_failpoint")
endpoint = env.endpoints.create_start("test_commit_lsn_lag_failpoint")
# Enable neon extension and table
endpoint.safe_psql("CREATE EXTENSION IF NOT EXISTS neon")
endpoint.safe_psql("CREATE TABLE t(key int primary key, value text)")
# Identify the lagging safekeeper and configure failpoint to pause
lagging_sk = env.safekeepers[1]
with lagging_sk.http_client() as http_cli:
http_cli.configure_failpoints(("sk-acceptor-pausable", "pause"))
# Note: Insert could hang because the failpoint above causes the safekeepers to lose quorum.
def run_hanging_insert():
endpoint.safe_psql("INSERT INTO t SELECT generate_series(1,500), 'payload'")
# Start the insert in a background thread
bg_thread = threading.Thread(target=run_hanging_insert)
bg_thread.start()
# Wait for the lag metric to become positive
def lag_is_positive():
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT value FROM neon_perf_counters WHERE metric = 'max_active_safekeeper_commit_lag'"
)
row = cur.fetchone()
assert row is not None, "max_active_safekeeper_commit_lag metric not found"
lag = row[0]
log.info(f"Current commit lag: {lag}")
if lag == 0.0:
raise Exception("Commit lag is still zero, trying again...")
# Confirm that we can observe a positive lag value
wait_until(lag_is_positive)
# Unpause the failpoint so that the safekeepers sync back up. This should also unstuck the hanging insert.
with lagging_sk.http_client() as http_cli:
http_cli.configure_failpoints(("sk-acceptor-pausable", "off"))
# Wait for the hanging insert to complete
bg_thread.join(timeout=30)
assert not bg_thread.is_alive(), "Hanging insert did not complete within timeout"
log.info("Hanging insert is unstuck successfully")
def lag_is_zero():
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT value FROM neon_perf_counters WHERE metric = 'max_active_safekeeper_commit_lag'"
)
row = cur.fetchone()
assert (
row is not None
), "max_active_safekeeper_commit_lag metric not found in lag_is_zero"
lag = row[0]
log.info(f"Current commit lag: {lag}")
return lag == 0.0
# Confirm that the lag eventually returns to zero
wait_until(lag_is_zero)