benchmarking: extend test_page_service_batching.py to cover concurrent IO + batching under random reads (#10466)

This PR commits the benchmarks I ran to qualify concurrent IO before we
released it.

Changes:
- Add `l0stack` fixture; a reusable abstraction for creating a stack of
L0 deltas
  each of which has 1 Value::Delta per page.
- Such a stack of L0 deltas is a good and understandable demo for
concurrent IO
because to reconstruct any page, $layer_stack_height` Values need to be
read.
  Before concurrent IO, the reads were sequential.
  With concurrent IO, they are executed concurrently.
- So, switch `test_latency` to use the l0stack.
- Teach `pagebench`, which is used by `test_latency`, to limit itself to
the blocks of the relation created by the l0stack abstraction.
- Additional parametrization of `test_latency` over dimensions
`ps_io_concurrency,l0_stack_height,queue_depth`
- Use better names for the tests to reflect what they do, leave
interpretation of the (now quite high-dimensional) results to the reader
  - `test_{throughput => postgres_seqscan}`
  - `test_{latency => random_reads}`
- Cut down on permutations to those we use in production. Runtime is
about 2min.

Refs
- concurrent IO epic https://github.com/neondatabase/neon/issues/9378 
- batching task: fixes https://github.com/neondatabase/neon/issues/9837

---------

Co-authored-by: Peter Bendel <peterbendel@neon.tech>
This commit is contained in:
Christian Schwarz
2025-05-15 19:48:13 +02:00
committed by GitHub
parent 31026d5a3c
commit a7ce323949
9 changed files with 387 additions and 73 deletions

View File

@@ -910,6 +910,11 @@ impl Key {
self.field1 == 0x00 && self.field4 != 0 && self.field6 != 0xffffffff
}
#[inline(always)]
pub fn is_rel_block_of_rel(&self, rel: Oid) -> bool {
self.is_rel_block_key() && self.field4 == rel
}
#[inline(always)]
pub fn is_rel_dir_key(&self) -> bool {
self.field1 == 0x00

View File

@@ -65,6 +65,9 @@ pub(crate) struct Args {
#[clap(long, default_value = "1")]
queue_depth: NonZeroUsize,
#[clap(long)]
only_relnode: Option<u32>,
targets: Option<Vec<TenantTimelineId>>,
}
@@ -206,7 +209,12 @@ async fn main_impl(
for r in partitioning.keys.ranges.iter() {
let mut i = r.start;
while i != r.end {
if i.is_rel_block_key() {
let mut include = true;
include &= i.is_rel_block_key();
if let Some(only_relnode) = args.only_relnode {
include &= i.is_rel_block_of_rel(only_relnode);
}
if include {
filtered.add_key(i);
}
i = i.next();

View File

@@ -0,0 +1,59 @@
"""
Script to creates a stack of L0 deltas each of which should have 1 Value::Delta per page in `data`,
in your running neon_local setup.
Use this bash setup to reset your neon_local environment.
The last line of this bash snippet will run this file here.
```
export NEON_REPO_DIR=$PWD/.neon
export NEON_BIN_DIR=$PWD/target/release
$NEON_BIN_DIR/neon_local stop
rm -rf $NEON_REPO_DIR
$NEON_BIN_DIR/neon_local init
cat >> $NEON_REPO_DIR/pageserver_1/pageserver.toml <<"EOF"
# customizations
virtual_file_io_mode = "direct-rw"
page_service_pipelining={mode="pipelined", max_batch_size=32, execution="concurrent-futures"}
get_vectored_concurrent_io={mode="sidecar-task"}
EOF
$NEON_BIN_DIR/neon_local start
psql 'postgresql://localhost:1235/storage_controller' -c 'DELETE FROM tenant_shards'
sed 's/.*get_vectored_concurrent_io.*/get_vectored_concurrent_io={mode="sidecar-task"}/' -i $NEON_REPO_DIR/pageserver_1/pageserver.toml
$NEON_BIN_DIR/neon_local pageserver restart
sleep 2
$NEON_BIN_DIR/neon_local tenant create --set-default
./target/debug/neon_local endpoint stop foo
rm -rf $NEON_REPO_DIR/endpoints/foo
./target/debug/neon_local endpoint create foo
echo 'full_page_writes=off' >> $NEON_REPO_DIR/endpoints/foo/postgresql.conf
./target/debug/neon_local endpoint start foo
pushd test_runner; poetry run python3 -m bin.neon_local_create_deep_l0_stack 10; popd
```
"""
import sys
import psycopg2
from fixtures.common_types import TenantShardId, TimelineId
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pageserver.makelayers.l0stack import L0StackShape, make_l0_stack_standalone
ps_http = PageserverHttpClient(port=9898, is_testing_enabled_or_skip=lambda: None)
vps_http = PageserverHttpClient(port=1234, is_testing_enabled_or_skip=lambda: None)
tenants = ps_http.tenant_list()
assert len(tenants) == 1
tenant_shard_id = TenantShardId.parse(tenants[0]["id"])
timlines = ps_http.timeline_list(tenant_shard_id)
assert len(timlines) == 1
timeline_id = TimelineId(timlines[0]["timeline_id"])
connstr = "postgresql://cloud_admin@localhost:55432/postgres"
conn = psycopg2.connect(connstr)
shape = L0StackShape(logical_table_size_mib=50, delta_stack_height=int(sys.argv[1]))
make_l0_stack_standalone(vps_http, ps_http, tenant_shard_id, timeline_id, conn, shape)

View File

@@ -1377,7 +1377,11 @@ class NeonEnv:
force=config.config_init_force,
)
def start(self, timeout_in_seconds: int | None = None):
def start(
self,
timeout_in_seconds: int | None = None,
extra_ps_env_vars: dict[str, str] | None = None,
):
# Storage controller starts first, so that pageserver /re-attach calls don't
# bounce through retries on startup
self.storage_controller.start(timeout_in_seconds=timeout_in_seconds)
@@ -1396,7 +1400,10 @@ class NeonEnv:
for pageserver in self.pageservers:
futs.append(
executor.submit(
lambda ps=pageserver: ps.start(timeout_in_seconds=timeout_in_seconds) # type: ignore[misc]
lambda ps=pageserver: ps.start( # type: ignore[misc]
extra_env_vars=extra_ps_env_vars or {},
timeout_in_seconds=timeout_in_seconds,
),
)
)

View File

@@ -0,0 +1,148 @@
from dataclasses import dataclass
from psycopg2.extensions import connection as PgConnection
from fixtures.common_types import Lsn, TenantShardId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import Endpoint
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pageserver.utils import wait_for_last_record_lsn
@dataclass
class L0StackShape:
logical_table_size_mib: int = 50
delta_stack_height: int = 20
def make_l0_stack(endpoint: Endpoint, shape: L0StackShape):
"""
Creates stack of L0 deltas each of which should have 1 Value::Delta per page in table `data`.
"""
env = endpoint.env
# TDOO: wait for storcon to finish any reonciles before jumping to action here?
description = env.storage_controller.tenant_describe(endpoint.tenant_id)
shards = description["shards"]
assert len(shards) == 1, "does not support sharding"
tenant_shard_id = TenantShardId.parse(shards[0]["tenant_shard_id"])
endpoint.config(["full_page_writes=off"])
endpoint.reconfigure()
ps = env.get_pageserver(shards[0]["node_attached"])
timeline_id = endpoint.show_timeline_id()
vps_http = env.storage_controller.pageserver_api()
ps_http = ps.http_client()
endpoint_conn = endpoint.connect()
make_l0_stack_standalone(vps_http, ps_http, tenant_shard_id, timeline_id, endpoint_conn, shape)
def make_l0_stack_standalone(
vps_http: PageserverHttpClient,
ps_http: PageserverHttpClient,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
endpoint_conn: PgConnection,
shape: L0StackShape,
):
"""
See make_l0_stack for details.
This function is a standalone version of make_l0_stack, usable from not-test code.
"""
assert not tenant_shard_id.shard_index.is_sharded, (
"the current implementation only supports unsharded tenants"
)
tenant_id = tenant_shard_id.tenant_id
conn = endpoint_conn
desired_size = shape.logical_table_size_mib * 1024 * 1024
config = {
"gc_period": "0s", # disable periodic gc
"checkpoint_timeout": "10 years",
"compaction_period": "1h", # doesn't matter, but 0 value will kill walredo every 10s
"compaction_threshold": 100000, # we just want L0s
"compaction_target_size": 134217728,
"checkpoint_distance": 268435456,
"image_creation_threshold": 100000, # we just want L0s
}
vps_http.set_tenant_config(tenant_id, config)
conn.autocommit = True
cur = conn.cursor()
# Ensure full_page_writes are disabled so that all Value::Delta in
# pageserver are !will_init, and therefore a getpage needs to read
# the entire delta stack.
cur.execute("SHOW full_page_writes")
assert cur.fetchall()[0][0] == "off", "full_page_writes should be off"
# each tuple is 23 (header) + 100 bytes = 123 bytes
# page header si 24 bytes
# 8k page size
# (8k-24bytes) / 123 bytes = 63 tuples per page
# set fillfactor to 10 to have 6 tuples per page
cur.execute("DROP TABLE IF EXISTS data")
cur.execute("CREATE TABLE data(id bigint, row char(92)) with (fillfactor=10)")
need_pages = desired_size // 8192
need_rows = need_pages * 6
log.info(f"Need {need_pages} pages, {need_rows} rows")
cur.execute(f"INSERT INTO data SELECT i,'row'||i FROM generate_series(1, {need_rows}) as i")
# Raise fillfactor to 100% so that all updates are HOT updates.
# We assert they're hot updates by checking fetch_id_to_page_mapping remains the same.
cur.execute("ALTER TABLE data SET (fillfactor=100)")
def settle_and_flush():
cur.execute("SELECT pg_current_wal_flush_lsn()")
flush_lsn = Lsn(cur.fetchall()[0][0])
wait_for_last_record_lsn(ps_http, tenant_shard_id, timeline_id, flush_lsn)
ps_http.timeline_checkpoint(tenant_id, timeline_id)
# create an L0 for the initial data we just inserted
settle_and_flush()
# assert we wrote what we think we wrote
cur.execute("""
with ntuples_per_page as (
select (ctid::text::point)[0]::bigint pageno,count(*) ntuples from data group by pageno
)
select ntuples, count(*) npages from ntuples_per_page group by ntuples order by ntuples;
""")
rows = cur.fetchall()
log.info(f"initial table layout: {rows}")
assert len(rows) == 1
assert rows[0][0] == 6, f"expected 6 tuples per page, got {rows[0][0]}"
assert rows[0][1] == need_pages, f"expected {need_pages} pages, got {rows[0][1]}"
def fetch_id_to_page_mapping():
cur.execute("""
SELECT id,(ctid::text::point)[0]::bigint pageno FROM data ORDER BY id
""")
return cur.fetchall()
initial_mapping = fetch_id_to_page_mapping()
# every iteration updates one tuple in each page
delta_stack_height = shape.delta_stack_height
for i in range(0, delta_stack_height):
log.info(i)
cur.execute(f"UPDATE data set row = row||',u' where id % 6 = {i % 6}")
log.info(f"modified rows: {cur.rowcount}")
assert cur.rowcount == need_pages
settle_and_flush()
post_update_mapping = fetch_id_to_page_mapping()
assert initial_mapping == post_update_mapping, "Postgres should be doing HOT updates"
# Assert the layer count is what we expect it is
layer_map = vps_http.layer_map_info(tenant_id, timeline_id)
assert (
len(layer_map.delta_l0_layers()) == delta_stack_height + 1 + 1
) # +1 for the initdb layer + 1 for the table creation & fill
assert len(layer_map.delta_l0_layers()) == len(layer_map.delta_layers()) # it's all L0s
assert len(layer_map.image_layers()) == 0 # no images

View File

@@ -15,7 +15,8 @@ Some handy pytest flags for local development:
- `-k` selects a test to run
- `--timeout=0` disables our default timeout of 300s (see `setup.cfg`)
- `--preserve-database-files` to skip cleanup
- `--out-dir` to produce a JSON with the recorded test metrics
- `--out-dir` to produce a JSON with the recorded test metrics.
There is a post-processing tool at `test_runner/performance/out_dir_to_csv.py`.
# What performance tests do we have and how we run them

View File

@@ -0,0 +1,57 @@
# Tool to convert the JSON output from running a perf test with `--out-dir` to a CSV that
# can be easily pasted into a spreadsheet for quick viz & analysis.
# Check the `./README.md` in this directory for `--out-dir`.
#
# TODO: add the pytest.mark.parametrize to the json and make them columns here
# https://github.com/neondatabase/neon/issues/11878
import csv
import json
import os
import sys
def json_to_csv(json_file):
with open(json_file) as f:
data = json.load(f)
# Collect all possible metric names to form headers
all_metrics = set()
for result in data.get("result", []):
for metric in result.get("data", []):
all_metrics.add(metric["name"])
# Sort metrics for consistent output
metrics = sorted(list(all_metrics))
# Create headers
headers = ["suit"] + metrics
# Prepare rows
rows = []
for result in data.get("result", []):
row = {"suit": result["suit"]}
# Initialize all metrics to empty
for metric in metrics:
row[metric] = ""
# Fill in available metrics
for item in result.get("data", []):
row[item["name"]] = item["value"]
rows.append(row)
# Write to stdout as CSV
writer = csv.DictWriter(sys.stdout, fieldnames=headers)
writer.writeheader()
writer.writerows(rows)
if __name__ == "__main__":
if len(sys.argv) < 2:
print(f"Usage: python {os.path.basename(__file__)} <json_file>")
sys.exit(1)
json_file = sys.argv[1]
json_to_csv(json_file)

View File

@@ -10,7 +10,8 @@ from typing import Any
import pytest
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, wait_for_last_flush_lsn
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, PgBin
from fixtures.pageserver.makelayers import l0stack
from fixtures.utils import humantime_to_ms
TARGET_RUNTIME = 30
@@ -34,28 +35,18 @@ class PageServicePipeliningConfigPipelined(PageServicePipeliningConfig):
mode: str = "pipelined"
EXECUTION = ["concurrent-futures"]
BATCHING = ["uniform-lsn", "scattered-lsn"]
NON_BATCHABLE: list[PageServicePipeliningConfig] = [PageServicePipeliningConfigSerial()]
for max_batch_size in [1, 32]:
for execution in EXECUTION:
for batching in BATCHING:
NON_BATCHABLE.append(
PageServicePipeliningConfigPipelined(max_batch_size, execution, batching)
)
BATCHABLE: list[PageServicePipeliningConfig] = []
PS_IO_CONCURRENCY = ["sidecar-task"]
PIPELINING_CONFIGS: list[PageServicePipeliningConfig] = []
for max_batch_size in [32]:
for execution in EXECUTION:
for batching in BATCHING:
BATCHABLE.append(
for execution in ["concurrent-futures"]:
for batching in ["scattered-lsn"]:
PIPELINING_CONFIGS.append(
PageServicePipeliningConfigPipelined(max_batch_size, execution, batching)
)
@pytest.mark.parametrize(
"tablesize_mib, pipelining_config, target_runtime, effective_io_concurrency, readhead_buffer_size, name",
"tablesize_mib, pipelining_config, target_runtime, ps_io_concurrency, effective_io_concurrency, readhead_buffer_size, name",
[
# batchable workloads should show throughput and CPU efficiency improvements
*[
@@ -63,20 +54,23 @@ for max_batch_size in [32]:
50,
config,
TARGET_RUNTIME,
ps_io_concurrency,
100,
128,
f"batchable {dataclasses.asdict(config)}",
)
for config in BATCHABLE
for config in PIPELINING_CONFIGS
for ps_io_concurrency in PS_IO_CONCURRENCY
],
],
)
def test_throughput(
def test_postgres_seqscan(
neon_env_builder: NeonEnvBuilder,
zenbenchmark: NeonBenchmarker,
tablesize_mib: int,
pipelining_config: PageServicePipeliningConfig,
target_runtime: int,
ps_io_concurrency: str,
effective_io_concurrency: int,
readhead_buffer_size: int,
name: str,
@@ -97,6 +91,10 @@ def test_throughput(
If the compute provides pipeline depth (effective_io_concurrency=100), then
pipelining configs, especially with max_batch_size>1 should yield dramatic improvements
in all performance metrics.
We advance the LSN from a disruptor thread to simulate the effect of a workload with concurrent writes
in another table. The `scattered-lsn` batching mode handles this well whereas the
initial implementatin (`uniform-lsn`) would break the batch.
"""
#
@@ -114,7 +112,19 @@ def test_throughput(
}
)
# For storing configuration as a metric, insert a fake 0 with labels with actual data
params.update({"pipelining_config": (0, {"labels": dataclasses.asdict(pipelining_config)})})
params.update(
{
"config": (
0,
{
"labels": {
"pipelining_config": dataclasses.asdict(pipelining_config),
"ps_io_concurrency": ps_io_concurrency,
}
},
)
}
)
log.info("params: %s", params)
@@ -266,7 +276,10 @@ def test_throughput(
return iters
env.pageserver.patch_config_toml_nonrecursive(
{"page_service_pipelining": dataclasses.asdict(pipelining_config)}
{
"page_service_pipelining": dataclasses.asdict(pipelining_config),
"get_vectored_concurrent_io": {"mode": ps_io_concurrency},
}
)
# set trace for log analysis below
@@ -318,77 +331,63 @@ def test_throughput(
)
PRECISION_CONFIGS: list[PageServicePipeliningConfig] = [PageServicePipeliningConfigSerial()]
for max_batch_size in [1, 32]:
for execution in EXECUTION:
for batching in BATCHING:
PRECISION_CONFIGS.append(
PageServicePipeliningConfigPipelined(max_batch_size, execution, batching)
)
@pytest.mark.parametrize(
"pipelining_config,name",
[(config, f"{dataclasses.asdict(config)}") for config in PRECISION_CONFIGS],
"pipelining_config,ps_io_concurrency,l0_stack_height,queue_depth,name",
[
(config, ps_io_concurrency, l0_stack_height, queue_depth, f"{dataclasses.asdict(config)}")
for config in PIPELINING_CONFIGS
for ps_io_concurrency in PS_IO_CONCURRENCY
for queue_depth in [1, 2, 32]
for l0_stack_height in [0, 20]
],
)
def test_latency(
def test_random_reads(
neon_env_builder: NeonEnvBuilder,
zenbenchmark: NeonBenchmarker,
pg_bin: PgBin,
pipelining_config: PageServicePipeliningConfig,
ps_io_concurrency: str,
l0_stack_height: int,
queue_depth: int,
name: str,
):
"""
Measure the latency impact of pipelining in an un-batchable workloads.
An ideal implementation should not increase average or tail latencies for such workloads.
We don't have support in pagebench to create queue depth yet.
=> https://github.com/neondatabase/neon/issues/9837
Throw pagebench random getpage at latest lsn workload from a single client against pageserver.
"""
#
# Setup
#
def build_snapshot_cb(neon_env_builder: NeonEnvBuilder) -> NeonEnv:
env = neon_env_builder.init_start()
endpoint = env.endpoints.create_start("main")
l0stack.make_l0_stack(
endpoint,
l0stack.L0StackShape(logical_table_size_mib=50, delta_stack_height=l0_stack_height),
)
return env
env = neon_env_builder.build_and_use_snapshot(
f"test_page_service_batching--test_pagebench-{l0_stack_height}", build_snapshot_cb
)
def patch_ps_config(ps_config):
if pipelining_config is not None:
ps_config["page_service_pipelining"] = dataclasses.asdict(pipelining_config)
ps_config["page_service_pipelining"] = dataclasses.asdict(pipelining_config)
ps_config["get_vectored_concurrent_io"] = {"mode": ps_io_concurrency}
neon_env_builder.pageserver_config_override = patch_ps_config
env.pageserver.edit_config_toml(patch_ps_config)
env = neon_env_builder.init_start()
endpoint = env.endpoints.create_start("main")
conn = endpoint.connect()
cur = conn.cursor()
env.start()
cur.execute("SET max_parallel_workers_per_gather=0") # disable parallel backends
cur.execute("SET effective_io_concurrency=1")
cur.execute("CREATE EXTENSION IF NOT EXISTS neon;")
cur.execute("CREATE EXTENSION IF NOT EXISTS neon_test_utils;")
log.info("Filling the table")
cur.execute("CREATE TABLE t (data char(1000)) with (fillfactor=10)")
tablesize = 50 * 1024 * 1024
npages = tablesize // (8 * 1024)
cur.execute("INSERT INTO t SELECT generate_series(1, %s)", (npages,))
# TODO: can we force postgres to do sequential scans?
cur.close()
conn.close()
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, env.initial_timeline)
endpoint.stop()
lsn = env.safekeepers[0].get_commit_lsn(env.initial_tenant, env.initial_timeline)
ep = env.endpoints.create_start("main", lsn=lsn)
data_table_relnode_oid = ep.safe_psql_scalar("SELECT 'data'::regclass::oid")
ep.stop_and_destroy()
for sk in env.safekeepers:
sk.stop()
#
# Run single-threaded pagebench (TODO: dedup with other benchmark code)
#
env.pageserver.allowed_errors.append(
# https://github.com/neondatabase/neon/issues/6925
r".*query handler for.*pagestream.*failed: unexpected message: CopyFail during COPY.*"
@@ -396,6 +395,8 @@ def test_latency(
ps_http = env.pageserver.http_client()
metrics_before = ps_http.get_metrics()
cmd = [
str(env.neon_binpath / "pagebench"),
"get-page-latest-lsn",
@@ -405,6 +406,10 @@ def test_latency(
env.pageserver.connstr(password=None),
"--num-clients",
"1",
"--queue-depth",
str(queue_depth),
"--only-relnode",
str(data_table_relnode_oid),
"--runtime",
"10s",
]
@@ -413,12 +418,22 @@ def test_latency(
results_path = Path(basepath + ".stdout")
log.info(f"Benchmark results at: {results_path}")
metrics_after = ps_http.get_metrics()
with open(results_path) as f:
results = json.load(f)
log.info(f"Results:\n{json.dumps(results, sort_keys=True, indent=2)}")
total = results["total"]
metric = "request_count"
zenbenchmark.record(
metric,
metric_value=total[metric],
unit="",
report=MetricReport.HIGHER_IS_BETTER,
)
metric = "latency_mean"
zenbenchmark.record(
metric,
@@ -435,3 +450,17 @@ def test_latency(
unit="ms",
report=MetricReport.LOWER_IS_BETTER,
)
reads_before = metrics_before.query_one(
"pageserver_io_operations_seconds_count", filter={"operation": "read"}
)
reads_after = metrics_after.query_one(
"pageserver_io_operations_seconds_count", filter={"operation": "read"}
)
zenbenchmark.record(
"virtual_file_reads",
metric_value=reads_after.value - reads_before.value,
unit="",
report=MetricReport.LOWER_IS_BETTER,
)