mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-26 01:20:38 +00:00
Before this PR, each core had 3 executor threads from 3 different runtimes. With this PR, we just have one runtime, with one thread per core. Switching to a single tokio runtime should reduce that effective over-commit of CPU and in theory help with tail latencies -- iff all tokio tasks are well-behaved and yield to the runtime regularly. Are All Tasks Well-Behaved? Are We Ready? ----------------------------------------- Sadly there doesn't seem to be good out-of-the box tokio tooling to answer this question. We *believe* all tasks are well behaved in today's code base, as of the switch to `virtual_file_io_engine = "tokio-epoll-uring"` in production (https://github.com/neondatabase/aws/pull/1121). The only remaining executor-thread-blocking code is walredo and some filesystem namespace operations. Filesystem namespace operations work is being tracked in #6663 and not considered likely to actually block at this time. Regarding walredo, it currently does a blocking `poll` for read/write to the pipe file descriptors we use for IPC with the walredo process. There is an ongoing experiment to make walredo async (#6628), but it needs more time because there are surprisingly tricky trade-offs that are articulated in that PR's description (which itself is still WIP). What's relevant for *this* PR is that 1. walredo is always CPU-bound 2. production tail latencies for walredo request-response (`pageserver_wal_redo_seconds_bucket`) are - p90: with few exceptions, low hundreds of micro-seconds - p95: except on very packed pageservers, below 1ms - p99: all below 50ms, vast majority below 1ms - p99.9: almost all around 50ms, rarely at >= 70ms - [Dashboard Link](https://neonprod.grafana.net/d/edgggcrmki3uof/2024-03-walredo-latency?orgId=1&var-ds=ZNX49CDVz&var-pXX_by_instance=0.9&var-pXX_by_instance=0.99&var-pXX_by_instance=0.95&var-adhoc=instance%7C%21%3D%7Cpageserver-30.us-west-2.aws.neon.tech&var-per_instance_pXX_max_seconds=0.0005&from=1711049688777&to=1711136088777) The ones below 1ms are below our current threshold for when we start thinking about yielding to the executor. The tens of milliseconds stalls aren't great, but, not least because of the implicit overcommit of CPU by the three runtimes, we can't be sure whether these tens of milliseconds are inherently necessary to do the walredo work or whether we could be faster if there was less contention for CPU. On the first item (walredo being always CPU-bound work): it means that walredo processes will always compete with the executor threads. We could yield, using async walredo, but then we hit the trade-offs explained in that PR. tl;dr: the risk of stalling executor threads through blocking walredo seems low, and switching to one runtime cleans up one potential source for higher-than-necessary stall times (explained in the previous paragraphs). Code Changes ------------ - Remove the 3 different runtime definitions. - Add a new definition called `THE_RUNTIME`. - Use it in all places that previously used one of the 3 removed runtimes. - Remove the argument from `task_mgr`. - Fix failpoint usage where `pausable_failpoint!` should have been used. We encountered some actual failures because of this, e.g., hung `get_metric()` calls during test teardown that would client-timeout after 300s. As indicated by the comment above `THE_RUNTIME`, we could take this clean-up further. But before we create so much churn, let's first validate that there's no perf regression. Performance ----------- We will test this in staging using the various nightly benchmark runs. However, the worst-case impact of this change is likely compaction (=>image layer creation) competing with compute requests. Image layer creation work can't be easily generated & repeated quickly by pagebench. So, we'll simply watch getpage & basebackup tail latencies in staging. Additionally, I have done manual benchmarking using pagebench. Report: https://neondatabase.notion.site/2024-03-23-oneruntime-change-benchmarking-22a399c411e24399a73311115fb703ec?pvs=4 Tail latencies and throughput are marginally better (no regression = good). Except in a workload with 128 clients against one tenant. There, the p99.9 and p99.99 getpage latency is about 2x worse (at slightly lower throughput). A dip in throughput every 20s (compaction_period_ is clearly visible, and probably responsible for that worse tail latency. This has potential to improve with async walredo, and is an edge case workload anyway. Future Work ----------- 1. Once this change has shown satisfying results in production, change the codebase to use the ambient runtime instead of explicitly referencing `THE_RUNTIME`. 2. Have a mode where we run with a single-threaded runtime, so we uncover executor stalls more quickly. 3. Switch or write our own failpoints library that is async-native: https://github.com/neondatabase/neon/issues/7216
168 lines
6.9 KiB
Python
168 lines
6.9 KiB
Python
import threading
|
|
import time
|
|
from contextlib import closing, contextmanager
|
|
|
|
import psycopg2.extras
|
|
import pytest
|
|
from fixtures.log_helper import log
|
|
from fixtures.neon_fixtures import Endpoint, NeonEnvBuilder
|
|
|
|
pytest_plugins = "fixtures.neon_fixtures"
|
|
|
|
|
|
@contextmanager
|
|
def pg_cur(pg):
|
|
with closing(pg.connect()) as conn:
|
|
with conn.cursor() as cur:
|
|
yield cur
|
|
|
|
|
|
# Periodically check that all backpressure lags are below the configured threshold,
|
|
# assert if they are not.
|
|
# If the check query fails, stop the thread. Main thread should notice that and stop the test.
|
|
def check_backpressure(endpoint: Endpoint, stop_event: threading.Event, polling_interval=5):
|
|
log.info("checks started")
|
|
|
|
with pg_cur(endpoint) as cur:
|
|
cur.execute("select pg_size_bytes(current_setting('max_replication_write_lag'))")
|
|
res = cur.fetchone()
|
|
max_replication_write_lag_bytes = res[0]
|
|
log.info(f"max_replication_write_lag: {max_replication_write_lag_bytes} bytes")
|
|
|
|
cur.execute("select pg_size_bytes(current_setting('max_replication_flush_lag'))")
|
|
res = cur.fetchone()
|
|
max_replication_flush_lag_bytes = res[0]
|
|
log.info(f"max_replication_flush_lag: {max_replication_flush_lag_bytes} bytes")
|
|
|
|
cur.execute("select pg_size_bytes(current_setting('max_replication_apply_lag'))")
|
|
res = cur.fetchone()
|
|
max_replication_apply_lag_bytes = res[0]
|
|
log.info(f"max_replication_apply_lag: {max_replication_apply_lag_bytes} bytes")
|
|
|
|
with pg_cur(endpoint) as cur:
|
|
while not stop_event.is_set():
|
|
try:
|
|
cur.execute(
|
|
"""
|
|
select pg_wal_lsn_diff(pg_current_wal_flush_lsn(),received_lsn) as received_lsn_lag,
|
|
pg_wal_lsn_diff(pg_current_wal_flush_lsn(),disk_consistent_lsn) as disk_consistent_lsn_lag,
|
|
pg_wal_lsn_diff(pg_current_wal_flush_lsn(),remote_consistent_lsn) as remote_consistent_lsn_lag,
|
|
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_flush_lsn(),received_lsn)),
|
|
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_flush_lsn(),disk_consistent_lsn)),
|
|
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_flush_lsn(),remote_consistent_lsn))
|
|
from backpressure_lsns();
|
|
"""
|
|
)
|
|
|
|
res = cur.fetchone()
|
|
received_lsn_lag = res[0]
|
|
disk_consistent_lsn_lag = res[1]
|
|
remote_consistent_lsn_lag = res[2]
|
|
|
|
log.info(
|
|
f"received_lsn_lag = {received_lsn_lag} ({res[3]}), "
|
|
f"disk_consistent_lsn_lag = {disk_consistent_lsn_lag} ({res[4]}), "
|
|
f"remote_consistent_lsn_lag = {remote_consistent_lsn_lag} ({res[5]})"
|
|
)
|
|
|
|
# Since feedback from pageserver is not immediate, we should allow some lag overflow
|
|
lag_overflow = 5 * 1024 * 1024 # 5MB
|
|
|
|
if max_replication_write_lag_bytes > 0:
|
|
assert received_lsn_lag < max_replication_write_lag_bytes + lag_overflow
|
|
if max_replication_flush_lag_bytes > 0:
|
|
assert disk_consistent_lsn_lag < max_replication_flush_lag_bytes + lag_overflow
|
|
if max_replication_apply_lag_bytes > 0:
|
|
assert (
|
|
remote_consistent_lsn_lag < max_replication_apply_lag_bytes + lag_overflow
|
|
)
|
|
|
|
time.sleep(polling_interval)
|
|
|
|
except Exception as e:
|
|
log.info(f"backpressure check query failed: {e}")
|
|
stop_event.set()
|
|
|
|
log.info("check thread stopped")
|
|
|
|
|
|
# This test illustrates how to tune backpressure to control the lag
|
|
# between the WAL flushed on compute node and WAL digested by pageserver.
|
|
#
|
|
# To test it, throttle walreceiver ingest using failpoint and run heavy write load.
|
|
# If backpressure is disabled or not tuned properly, the query will timeout, because the walreceiver cannot keep up.
|
|
# If backpressure is enabled and tuned properly, insertion will be throttled, but the query will not timeout.
|
|
|
|
|
|
@pytest.mark.skip("See https://github.com/neondatabase/neon/issues/1587")
|
|
def test_backpressure_received_lsn_lag(neon_env_builder: NeonEnvBuilder):
|
|
env = neon_env_builder.init_start()
|
|
# Create a branch for us
|
|
env.neon_cli.create_branch("test_backpressure")
|
|
|
|
endpoint = env.endpoints.create(
|
|
"test_backpressure", config_lines=["max_replication_write_lag=30MB"]
|
|
)
|
|
# don't skip pg_catalog updates - it runs CREATE EXTENSION neon
|
|
# which is needed for backpressure_lsns() to work
|
|
endpoint.respec(skip_pg_catalog_updates=False)
|
|
endpoint.start()
|
|
|
|
# setup check thread
|
|
check_stop_event = threading.Event()
|
|
check_thread = threading.Thread(target=check_backpressure, args=(endpoint, check_stop_event))
|
|
check_thread.start()
|
|
|
|
# Configure failpoint to slow down walreceiver ingest
|
|
with closing(env.pageserver.connect()) as psconn:
|
|
with psconn.cursor(cursor_factory=psycopg2.extras.DictCursor) as pscur:
|
|
pscur.execute("failpoints walreceiver-after-ingest-blocking=sleep(20)")
|
|
|
|
# FIXME
|
|
# Wait for the check thread to start
|
|
#
|
|
# Now if load starts too soon,
|
|
# check thread cannot auth, because it is not able to connect to the database
|
|
# because of the lag and waiting for lsn to replay to arrive.
|
|
time.sleep(2)
|
|
|
|
with pg_cur(endpoint) as cur:
|
|
# Create and initialize test table
|
|
cur.execute("CREATE TABLE foo(x bigint)")
|
|
|
|
inserts_to_do = 2000000
|
|
rows_inserted = 0
|
|
|
|
while check_thread.is_alive() and rows_inserted < inserts_to_do:
|
|
try:
|
|
cur.execute("INSERT INTO foo select from generate_series(1, 100000)")
|
|
rows_inserted += 100000
|
|
except Exception as e:
|
|
if check_thread.is_alive():
|
|
log.info("stopping check thread")
|
|
check_stop_event.set()
|
|
check_thread.join()
|
|
raise AssertionError(
|
|
f"Exception {e} while inserting rows, but WAL lag is within configured threshold. That means backpressure is not tuned properly"
|
|
) from e
|
|
else:
|
|
raise AssertionError(
|
|
f"Exception {e} while inserting rows and WAL lag overflowed configured threshold. That means backpressure doesn't work."
|
|
) from e
|
|
|
|
log.info(f"inserted {rows_inserted} rows")
|
|
|
|
if check_thread.is_alive():
|
|
log.info("stopping check thread")
|
|
check_stop_event.set()
|
|
check_thread.join()
|
|
log.info("check thread stopped")
|
|
else:
|
|
raise AssertionError(
|
|
"WAL lag overflowed configured threshold. That means backpressure doesn't work."
|
|
)
|
|
|
|
|
|
# TODO test_backpressure_disk_consistent_lsn_lag. Play with pageserver's checkpoint settings
|
|
# TODO test_backpressure_remote_consistent_lsn_lag
|