Files
neon/test_runner/regress/test_gc_aggressive.py
Christian Schwarz aad410c8f1 improve ondemand-download latency observability (#11421)
## Problem

We don't have metrics to exactly quantify the end user impact of
on-demand downloads.

Perf tracing is underway (#11140) to supply us with high-resolution
*samples*.

But it will also be useful to have some aggregate per-timeline and
per-instance metrics that definitively contain all observations.

## Summary of changes

This PR consists of independent commits that should be reviewed
independently.

However, for convenience, we're going to merge them together.

- refactor(metrics): measure_remote_op can use async traits
- impr(pageserver metrics): task_kind dimension for
remote_timeline_client latency histo
  - implements https://github.com/neondatabase/cloud/issues/26800
- refs
https://github.com/neondatabase/cloud/issues/26193#issuecomment-2769705793
- use the opportunity to rename the metric and add a _global suffix;
checked grafana export, it's only used in two personal dashboards, one
of them mine, the other by Heikki
- log on-demand download latency for expensive-to-query but precise
ground truth
- metric for wall clock time spent waiting for on-demand downloads

## Refs

- refs https://github.com/neondatabase/cloud/issues/26800
- a bunch of minor investigations / incidents into latency outliers
2025-04-04 18:04:39 +00:00

165 lines
5.7 KiB
Python

from __future__ import annotations
import asyncio
import concurrent.futures
import random
from typing import TYPE_CHECKING
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
Endpoint,
NeonEnv,
NeonEnvBuilder,
wait_for_last_flush_lsn,
)
from fixtures.remote_storage import RemoteStorageKind
if TYPE_CHECKING:
from fixtures.common_types import TimelineId
# Test configuration
#
# Create a table with {NUM_ROWS} rows, and perform {UPDATES_TO_PERFORM} random
# UPDATEs on it, using {NUM_CONNECTIONS} separate connections.
NUM_CONNECTIONS = 10
NUM_ROWS = 100000
UPDATES_TO_PERFORM = 10000
# At the same time, run UPDATEs and GC
async def update_and_gc(env: NeonEnv, endpoint: Endpoint, timeline: TimelineId):
workers = []
updates_performed = 0
# Perform aggressive GC with 0 horizon
async def gc(env: NeonEnv, timeline: TimelineId):
pageserver_http = env.pageserver.http_client()
nonlocal updates_performed
global UPDATES_TO_PERFORM
loop = asyncio.get_running_loop()
def do_gc():
pageserver_http.timeline_checkpoint(env.initial_tenant, timeline)
pageserver_http.timeline_gc(env.initial_tenant, timeline, 0)
with concurrent.futures.ThreadPoolExecutor() as pool:
while updates_performed < UPDATES_TO_PERFORM:
await loop.run_in_executor(pool, do_gc)
# Run random UPDATEs on test table
async def update_table(endpoint: Endpoint):
pg_conn = await endpoint.connect_async()
nonlocal updates_performed
while updates_performed < UPDATES_TO_PERFORM:
updates_performed += 1
id = random.randrange(1, NUM_ROWS)
await pg_conn.fetchrow(f"UPDATE foo SET counter = counter + 1 WHERE id = {id}")
for _ in range(NUM_CONNECTIONS):
workers.append(asyncio.create_task(update_table(endpoint)))
workers.append(asyncio.create_task(gc(env, timeline)))
# await all workers
await asyncio.gather(*workers)
#
# Aggressively force GC, while running queries.
#
# (repro for https://github.com/neondatabase/neon/issues/1047)
#
def test_gc_aggressive(neon_env_builder: NeonEnvBuilder):
# Disable pitr, because here we want to test branch creation after GC
env = neon_env_builder.init_start(initial_tenant_conf={"pitr_interval": "0 sec"})
timeline = env.create_branch("test_gc_aggressive", ancestor_branch_name="main")
endpoint = env.endpoints.create_start("test_gc_aggressive")
with endpoint.cursor() as cur:
# Create table, and insert the first 100 rows
cur.execute("CREATE TABLE foo (id int, counter int, t text)")
cur.execute(
f"""
INSERT INTO foo
SELECT g, 0, 'long string to consume some space' || g
FROM generate_series(1, {NUM_ROWS}) g
"""
)
cur.execute("CREATE INDEX ON foo(id)")
asyncio.run(update_and_gc(env, endpoint, timeline))
cur.execute("SELECT COUNT(*), SUM(counter) FROM foo")
r = cur.fetchone()
assert r is not None
assert r == (NUM_ROWS, UPDATES_TO_PERFORM)
#
def test_gc_index_upload(neon_env_builder: NeonEnvBuilder):
num_index_uploads = 0
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
# Disable time-based pitr, we will use LSN-based thresholds in the manual GC calls
env = neon_env_builder.init_start(initial_tenant_conf={"pitr_interval": "0 sec"})
tenant_id = env.initial_tenant
timeline_id = env.create_branch("test_gc_index_upload", ancestor_branch_name="main")
endpoint = env.endpoints.create_start("test_gc_index_upload")
pageserver_http = env.pageserver.http_client()
pg_conn = endpoint.connect()
cur = pg_conn.cursor()
cur.execute("CREATE TABLE foo (id int, counter int, t text)")
cur.execute(
"""
INSERT INTO foo
SELECT g, 0, 'long string to consume some space' || g
FROM generate_series(1, 100000) g
"""
)
# Helper function that gets the number of given kind of remote ops from the metrics
def get_num_remote_ops(file_kind: str, op_kind: str) -> int:
ps_metrics = env.pageserver.http_client().get_metrics()
total = 0.0
for sample in ps_metrics.query_all(
name="pageserver_remote_timeline_client_seconds_global_count",
filter={
"file_kind": str(file_kind),
"op_kind": str(op_kind),
},
):
total += sample[2]
return int(total)
# Sanity check that the metric works
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
pageserver_http.timeline_gc(tenant_id, timeline_id, 10000)
before = get_num_remote_ops("index", "upload")
assert before > 0
# Run many cycles of GC. Then check that the number of index files
# uploads didn't grow much. In particular we don't want to re-upload the
# index file on every GC iteration, when it has no work to do.
#
# On each iteration, we use a slightly smaller GC horizon, so that the GC
# at least needs to check if it has work to do.
for i in range(100):
cur.execute("INSERT INTO foo VALUES (0, 0, 'foo')")
pageserver_http.timeline_gc(tenant_id, timeline_id, 10000 - i * 32)
num_index_uploads = get_num_remote_ops("index", "upload")
# Also make sure that a no-op compaction doesn't upload the index
# file unnecessarily.
pageserver_http.timeline_compact(tenant_id, timeline_id)
log.info(f"{num_index_uploads} index uploads after GC iteration {i}")
after = num_index_uploads
log.info(f"{after - before} new index uploads during test")
assert after - before < 5