safekeeper: add global disk usage utilization limit (#12605)

N.B: No-op for the neon-env.

## Problem

We added a per-timeline disk utilization protection circuit breaker,
which will stop the safekeeper from accepting more WAL writes if the
disk utilization by the timeline has exceeded a configured limit. We
mainly designed the mechanism as a guard against WAL upload/backup bugs,
and we assumed that as long as WAL uploads are proceeding as normal we
will not run into disk pressure. This turned out to be not true. In one
of our load tests where we have 500 PGs ingesting data at the same time,
safekeeper disk utilization started to creep up even though WAL uploads
were completely normal (we likely just maxed out our S3 upload bandwidth
from the single SK). This means the per-timeline disk utilization
protection won't be enough if too many timelines are ingesting data at
the same time.

## Summary of changes

Added a global disk utilization protection circuit breaker which will
stop a safekeeper from accepting more WAL writes if the total disk usage
on the safekeeper (across all tenants) exceeds a limit. We implemented
this circuit breaker through two parts:

1. A "global disk usage watcher" background task that runs at a
configured interval (default every minute) to see how much disk space is
being used in the safekeeper's filesystem. This background task also
performs the check against the limit and publishes the result to a
global atomic boolean flag.
2. The `hadron_check_disk_usage()` routine (in `timeline.rs`) now also
checks this global boolean flag published in the step above, and fails
the `WalAcceptor` (triggers the circuit breaker) if the flag was raised.

The disk usage limit is disabled by default.
It can be tuned with the `--max-global-disk-usage-ratio` CLI arg.

## How is this tested?

Added integration test
`test_wal_acceptor.py::test_global_disk_usage_limit`.

Also noticed that I haven't been using the `wait_until(f)` test function
correctly (the `f` passed in is supposed to raise an exception if the
condition is not met, instead of returning `False`...). Fixed it in both
circuit breaker tests.

---------

Co-authored-by: William Huang <william.huang@databricks.com>
This commit is contained in:
Vlad Lazar
2025-07-16 15:43:17 +01:00
committed by GitHub
parent 3e4cbaed67
commit 8b18d8b31b
10 changed files with 284 additions and 8 deletions

View File

@@ -2788,7 +2788,8 @@ def test_timeline_disk_usage_limit(neon_env_builder: NeonEnvBuilder):
# Wait for the error message to appear in the compute log
def error_logged():
return endpoint.log_contains("WAL storage utilization exceeds configured limit") is not None
if endpoint.log_contains("WAL storage utilization exceeds configured limit") is None:
raise Exception("Expected error message not found in compute log yet")
wait_until(error_logged)
log.info("Found expected error message in compute log, resuming.")
@@ -2822,3 +2823,87 @@ def test_timeline_disk_usage_limit(neon_env_builder: NeonEnvBuilder):
cur.execute("select count(*) from t")
# 2000 rows from first insert + 1000 from last insert
assert cur.fetchone() == (3000,)
def test_global_disk_usage_limit(neon_env_builder: NeonEnvBuilder):
"""
Similar to `test_timeline_disk_usage_limit`, but test that the global disk usage circuit breaker
also works as expected. The test scenario:
1. Create a timeline and endpoint.
2. Mock high disk usage via failpoint
3. Write data to the timeline so that disk usage exceeds the limit.
4. Verify that the writes hang and the expected error message appears in the compute log.
5. Mock low disk usage via failpoint
6. Verify that the hanging writes unblock and we can continue to write as normal.
"""
neon_env_builder.num_safekeepers = 1
remote_storage_kind = s3_storage()
neon_env_builder.enable_safekeeper_remote_storage(remote_storage_kind)
env = neon_env_builder.init_start()
env.create_branch("test_global_disk_usage_limit")
endpoint = env.endpoints.create_start("test_global_disk_usage_limit")
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute("create table t2(key int, value text)")
for sk in env.safekeepers:
sk.stop().start(
extra_opts=["--global-disk-check-interval=1s", "--max-global-disk-usage-ratio=0.8"]
)
# Set the failpoint to have the disk usage check return u64::MAX, which definitely exceeds the practical
# limits in the test environment.
for sk in env.safekeepers:
sk.http_client().configure_failpoints(
[("sk-global-disk-usage", "return(18446744073709551615)")]
)
# Wait until the global disk usage limit watcher trips the circuit breaker.
def error_logged_in_sk():
for sk in env.safekeepers:
if sk.log_contains("Global disk usage exceeded limit") is None:
raise Exception("Expected error message not found in safekeeper log yet")
wait_until(error_logged_in_sk)
def run_hanging_insert_global():
with closing(endpoint.connect()) as bg_conn:
with bg_conn.cursor() as bg_cur:
# This should generate more than 1KiB of WAL
bg_cur.execute("insert into t2 select generate_series(1,2000), 'payload'")
bg_thread_global = threading.Thread(target=run_hanging_insert_global)
bg_thread_global.start()
def error_logged_in_compute():
if endpoint.log_contains("Global disk usage exceeded limit") is None:
raise Exception("Expected error message not found in compute log yet")
wait_until(error_logged_in_compute)
log.info("Found the expected error message in compute log, resuming.")
time.sleep(2)
assert bg_thread_global.is_alive(), "Global hanging insert unblocked prematurely!"
# Make the disk usage check always return 0 through the failpoint to simulate the disk pressure easing.
# The SKs should resume accepting WAL writes without restarting.
for sk in env.safekeepers:
sk.http_client().configure_failpoints([("sk-global-disk-usage", "return(0)")])
bg_thread_global.join(timeout=120)
assert not bg_thread_global.is_alive(), "Hanging global insert did not complete after restart"
log.info("Global hanging insert unblocked.")
# Verify that we can continue to write as normal and we don't have obvious data corruption
# following the recovery.
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute("insert into t2 select generate_series(2001,3000), 'payload'")
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute("select count(*) from t2")
assert cur.fetchone() == (3000,)