Compare commits

...

40 Commits

Author SHA1 Message Date
Christian Schwarz
09e7485004 Merge branch 'problame/merge-getpage-test' into problame/batching-timer 2024-11-21 11:28:12 +01:00
Christian Schwarz
058b35f884 Merge branch 'problame/batching-benchmark' into problame/merge-getpage-test 2024-11-21 11:27:16 +01:00
Christian Schwarz
ff0aa152f1 Merge remote-tracking branch 'origin/main' into problame/batching-benchmark 2024-11-21 11:25:23 +01:00
Christian Schwarz
3375f28990 pytest.approx; https://github.com/neondatabase/neon/pull/9820#discussion_r1850679974 2024-11-21 11:21:50 +01:00
Christian Schwarz
e82deb2ccc high-resolution CPU usage 2024-11-21 11:16:00 +01:00
Christian Schwarz
fa7ce2ca07 the final choice: async-timer 1.0beta15 with features=["tokio1"] 2024-11-21 11:15:02 +01:00
Christian Schwarz
89b6cb8eba Revert "vanilla tokio based timer impl based on tokio::time::Sleep"
This reverts commit 517dda849f.
2024-11-20 20:17:49 +01:00
Christian Schwarz
c68661dfb3 Revert "undo local modifications to benchmark"
This reverts commit 7be13bc5a6.
2024-11-20 19:53:06 +01:00
Christian Schwarz
517dda849f vanilla tokio based timer impl based on tokio::time::Sleep 2024-11-20 19:52:47 +01:00
Christian Schwarz
f22ad868cf Revert "tokio_timerfd::Delay based impl"
This reverts commit fcda7a72c6.
2024-11-20 19:45:37 +01:00
Christian Schwarz
fcda7a72c6 tokio_timerfd::Delay based impl
Performs identically great to the async-timer::Timer features=tokio1 impl
Makes sense because it's the same thing that's happening under the hood.

https://www.notion.so/neondatabase/benchmarking-notes-143f189e004780c4a630cb5f426e39ba?pvs=4#144f189e004780ea9decc82281f6b8d1
2024-11-20 19:42:00 +01:00
Christian Schwarz
469ce810fc Revert "async-timer based approach (again, with data)"
This reverts commit 689788cbba.
2024-11-20 19:40:24 +01:00
Christian Schwarz
21866faa8a Revert "try async-timer 1.0.0-beta15 (still signal-based timers)"
This reverts commit c73e9e40e9.
2024-11-20 19:37:51 +01:00
Christian Schwarz
cbb5817997 Revert "async-timer 1.0.0-beta15 with features=tokio1"
This reverts commit 68550f0f50.
2024-11-20 19:37:44 +01:00
Christian Schwarz
5f3e6f398c Revert "try interval-based impl to cross-chec"
This reverts commit 721643beed.
2024-11-20 18:52:55 +01:00
Christian Schwarz
721643beed try interval-based impl to cross-chec
=> zero batching

https://www.notion.so/neondatabase/benchmarking-notes-143f189e004780c4a630cb5f426e39ba?pvs=4#144f189e00478065a9b3e51726082885
2024-11-20 18:50:48 +01:00
Christian Schwarz
68550f0f50 async-timer 1.0.0-beta15 with features=tokio1
Best batching factor so far with no worse degradation of
un-batchable workloads than the other candidates.

https://www.notion.so/neondatabase/benchmarking-notes-143f189e004780c4a630cb5f426e39ba?pvs=4#144f189e004780c0921fe99e1da0e8c9
2024-11-20 18:41:31 +01:00
Christian Schwarz
c73e9e40e9 try async-timer 1.0.0-beta15 (still signal-based timers)
Results unchanged to 0.7.4

https://www.notion.so/neondatabase/benchmarking-notes-143f189e004780c4a630cb5f426e39ba?pvs=4#144f189e004780e18416cc0faf2aca65
2024-11-20 18:32:53 +01:00
Christian Schwarz
7be13bc5a6 undo local modifications to benchmark 2024-11-20 16:00:19 +01:00
Christian Schwarz
689788cbba async-timer based approach (again, with data)
Yep, it's clearly the best one with best batching factor at lowest CPU
usage.

https://www.notion.so/neondatabase/benchmarking-notes-143f189e004780c4a630cb5f426e39ba?pvs=4#144f189e004780d0a205e081458b46db
2024-11-20 15:36:10 +01:00
Christian Schwarz
f9bf038d2c Revert "tokio_timerfd::Interval"
This reverts commit 12124b28d0.
2024-11-20 15:25:52 +01:00
Christian Schwarz
12124b28d0 tokio_timerfd::Interval
Resolution not high enough to do _any_ batching at 10us or 20us

https://www.notion.so/neondatabase/benchmarking-notes-143f189e004780c4a630cb5f426e39ba?pvs=4#144f189e0047800fb74bd8f4ab6cf8e2
2024-11-20 15:25:14 +01:00
Christian Schwarz
1d85bec0ea Revert "tokio::time::Interval based approach"
This reverts commit 81d99704ee.
2024-11-20 15:13:26 +01:00
Christian Schwarz
81d99704ee tokio::time::Interval based approach
batching at 10us doesn't work well enough, prob the future is ready
too soon. batching factor is just 1.5

https://www.notion.so/neondatabase/benchmarking-notes-143f189e004780c4a630cb5f426e39ba?pvs=4#144f189e004780b79c8dd6d007dbb120
2024-11-20 15:13:11 +01:00
Christian Schwarz
f3ed5692ea Revert "async-timer based approach"
This reverts commit 1639b26002.
2024-11-20 14:49:09 +01:00
Christian Schwarz
1639b26002 async-timer based approach
With this, 10us batching timeout works, but it has some other wrinkles:

- it uses the signal-based timer APIs instead of going through epoll (=> timerfd)
= it needs to make a syscall for each batch, which costs around 1-2us, so, probably significant CPU time wasted on this.
2024-11-20 14:49:01 +01:00
Christian Schwarz
af95320a8c Revert "Revert "switch back to tokio::time::sleep, to get the numbers""
This reverts commit aa695b2ad7.
2024-11-20 14:25:05 +01:00
Christian Schwarz
b299eb19e2 fixup whitespace stuff 2024-11-20 14:23:55 +01:00
Christian Schwarz
88d52b31b7 Merge branch 'problame/batching-benchmark' into problame/merge-getpage-test 2024-11-20 14:23:22 +01:00
Christian Schwarz
aa695b2ad7 Revert "switch back to tokio::time::sleep, to get the numbers"
This reverts commit b9746168ff.
2024-11-20 14:22:31 +01:00
Christian Schwarz
b695907752 page_service: add benchmark for batching
This PR adds a benchmark to demonstrate the effect of server-side
getpage request batching added in https://github.com/neondatabase/neon/pull/9321.

Refs:

- Epic: https://github.com/neondatabase/neon/issues/9376
- Extracted from https://github.com/neondatabase/neon/pull/9792
2024-11-20 14:18:42 +01:00
Christian Schwarz
75041cb61b bench fixups 2024-11-20 13:59:21 +01:00
Christian Schwarz
e80ce970f7 collect CPU utilization 2024-11-20 13:55:05 +01:00
Christian Schwarz
f2de5b504f make it a proper benchmark 2024-11-20 13:52:05 +01:00
Christian Schwarz
b9746168ff switch back to tokio::time::sleep, to get the numbers
=> https://www.notion.so/neondatabase/benchmarking-notes-143f189e004780c4a630cb5f426e39ba?pvs=4#144f189e00478054b8a3e325735ffa19

=> unacceptable
2024-11-20 12:50:29 +01:00
Christian Schwarz
5cc0059088 parametrize more test 2024-11-20 12:48:47 +01:00
Christian Schwarz
911946a3cd fixes 2024-11-19 19:01:55 +01:00
Christian Schwarz
61ff84a3a2 compiles 2024-11-19 18:40:56 +01:00
Christian Schwarz
15e21c714b got it working and turn it more into a benchmark 2024-11-18 23:57:14 +01:00
Christian Schwarz
0689965282 WIP: page_service: add basic testcase for merging
The steps in the test work in neon_local + psql
but for some reason they don't work in the test.

Asked compute team on Slack for help:
https://neondb.slack.com/archives/C04DGM6SMTM/p1731952688386789
2024-11-18 23:57:14 +01:00
7 changed files with 358 additions and 63 deletions

20
Cargo.lock generated
View File

@@ -244,6 +244,19 @@ dependencies = [
"syn 2.0.52",
]
[[package]]
name = "async-timer"
version = "1.0.0-beta.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d420af8e042475e58a20d91af8eda7d6528989418c03f3f527e1c3415696f70"
dependencies = [
"error-code",
"libc",
"tokio",
"wasm-bindgen",
"web-time",
]
[[package]]
name = "async-trait"
version = "0.1.68"
@@ -1920,6 +1933,12 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "error-code"
version = "3.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a5d9305ccc6942a704f4335694ecd3de2ea531b114ac2d51f5f843750787a92f"
[[package]]
name = "event-listener"
version = "2.5.3"
@@ -3590,6 +3609,7 @@ dependencies = [
"arc-swap",
"async-compression",
"async-stream",
"async-timer",
"bit_field",
"byteorder",
"bytes",

View File

@@ -47,6 +47,7 @@ anyhow = { version = "1.0", features = ["backtrace"] }
arc-swap = "1.6"
async-compression = { version = "0.4.0", features = ["tokio", "gzip", "zstd"] }
atomic-take = "1.1.0"
async-timer = { version= "1.0.0-beta.15", features = ["tokio1"] }
azure_core = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls", "hmac_rust"] }
azure_identity = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls"] }
azure_storage = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls"] }

View File

@@ -2,14 +2,28 @@
// This module has heavy inspiration from the prometheus crate's `process_collector.rs`.
use once_cell::sync::Lazy;
use prometheus::Gauge;
use crate::UIntGauge;
pub struct Collector {
descs: Vec<prometheus::core::Desc>,
vmlck: crate::UIntGauge,
cpu_seconds_highres: Gauge,
}
const NMETRICS: usize = 1;
const NMETRICS: usize = 2;
static CLK_TCK_F64: Lazy<f64> = Lazy::new(|| {
let long = unsafe { libc::sysconf(libc::_SC_CLK_TCK) };
if long == -1 {
panic!("sysconf(_SC_CLK_TCK) failed");
}
let convertible_to_f64: i32 =
i32::try_from(long).expect("sysconf(_SC_CLK_TCK) is larger than i32");
convertible_to_f64 as f64
});
impl prometheus::core::Collector for Collector {
fn desc(&self) -> Vec<&prometheus::core::Desc> {
@@ -27,6 +41,12 @@ impl prometheus::core::Collector for Collector {
mfs.extend(self.vmlck.collect())
}
}
if let Ok(stat) = myself.stat() {
let cpu_seconds = stat.utime + stat.stime;
self.cpu_seconds_highres
.set(cpu_seconds as f64 / *CLK_TCK_F64);
mfs.extend(self.cpu_seconds_highres.collect());
}
mfs
}
}
@@ -43,7 +63,23 @@ impl Collector {
.cloned(),
);
Self { descs, vmlck }
let cpu_seconds_highres = Gauge::new(
"libmetrics_process_cpu_seconds_highres",
"Total user and system CPU time spent in seconds.\
Sub-second resolution, hence better than `process_cpu_seconds_total`.",
)
.unwrap();
descs.extend(
prometheus::core::Collector::desc(&cpu_seconds_highres)
.into_iter()
.cloned(),
);
Self {
descs,
vmlck,
cpu_seconds_highres,
}
}
}

View File

@@ -15,6 +15,7 @@ anyhow.workspace = true
arc-swap.workspace = true
async-compression.workspace = true
async-stream.workspace = true
async-timer.workspace = true
bit_field.workspace = true
byteorder.workspace = true
bytes.workspace = true

View File

@@ -3,6 +3,7 @@
use anyhow::{bail, Context};
use async_compression::tokio::write::GzipEncoder;
use async_timer::Timer;
use bytes::Buf;
use futures::FutureExt;
use itertools::Itertools;
@@ -22,6 +23,7 @@ use pq_proto::FeStartupPacket;
use pq_proto::{BeMessage, FeMessage, RowDescriptor};
use std::borrow::Cow;
use std::io;
use std::pin::Pin;
use std::str;
use std::str::FromStr;
use std::sync::Arc;
@@ -314,11 +316,15 @@ struct PageServerHandler {
timeline_handles: TimelineHandles,
/// Messages queued up for the next processing batch
next_batch: Option<BatchedFeMessage>,
/// See [`PageServerConf::server_side_batch_timeout`]
server_side_batch_timeout: Option<Duration>,
server_side_batch_timer: Pin<Box<async_timer::timer::Platform>>,
}
struct Carry {
msg: BatchedFeMessage,
started_at: Instant,
}
struct TimelineHandles {
@@ -582,8 +588,8 @@ impl PageServerHandler {
connection_ctx,
timeline_handles: TimelineHandles::new(tenant_manager),
cancel,
next_batch: None,
server_side_batch_timeout,
server_side_batch_timer: Box::pin(async_timer::new_timer(Duration::from_secs(999))), // reset each iteration
}
}
@@ -611,44 +617,86 @@ impl PageServerHandler {
)
}
#[instrument(skip_all, level = tracing::Level::TRACE)]
async fn read_batch_from_connection<IO>(
&mut self,
pgb: &mut PostgresBackend<IO>,
tenant_id: &TenantId,
timeline_id: &TimelineId,
maybe_carry: &mut Option<Carry>,
ctx: &RequestContext,
) -> Result<Option<BatchOrEof>, QueryError>
) -> Result<BatchOrEof, QueryError>
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
let mut batch = self.next_batch.take();
let mut batch_started_at: Option<std::time::Instant> = None;
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
let next_batch: Option<BatchedFeMessage> = loop {
let sleep_fut = match (self.server_side_batch_timeout, batch_started_at) {
(Some(batch_timeout), Some(started_at)) => futures::future::Either::Left(
tokio::time::sleep_until((started_at + batch_timeout).into()),
),
_ => futures::future::Either::Right(futures::future::pending()),
let mut batching_deadline_storage = None; // TODO: can this just be an unsync once_cell?
loop {
// Create a future that will become ready when we need to stop batching.
use futures::future::Either;
let batching_deadline = match (
&*maybe_carry as &Option<Carry>,
&mut batching_deadline_storage,
) {
(None, None) => Either::Left(futures::future::pending()), // there's no deadline before we have something batched
(None, Some(_)) => unreachable!(),
(Some(_), Some(fut)) => Either::Right(fut), // below arm already ran
(Some(carry), None) => {
match self.server_side_batch_timeout {
None => {
return Ok(BatchOrEof::Batch(smallvec::smallvec![
maybe_carry
.take()
.expect("we already checked it's Some")
.msg
]))
}
Some(batch_timeout) => {
// Take into consideration the time the carry spent waiting.
let batch_timeout =
batch_timeout.saturating_sub(carry.started_at.elapsed());
if batch_timeout.is_zero() {
// the timer doesn't support restarting with zero duration
return Ok(BatchOrEof::Batch(smallvec::smallvec![
maybe_carry
.take()
.expect("we already checked it's Some")
.msg
]));
} else {
self.server_side_batch_timer.restart(batch_timeout);
batching_deadline_storage = Some(&mut self.server_side_batch_timer);
Either::Right(
batching_deadline_storage.as_mut().expect("we just set it"),
)
}
}
}
}
};
let msg = tokio::select! {
biased;
_ = self.cancel.cancelled() => {
return Err(QueryError::Shutdown)
}
msg = pgb.read_message() => {
msg
}
_ = sleep_fut => {
assert!(batch.is_some());
break None;
_ = batching_deadline => {
return Ok(BatchOrEof::Batch(smallvec::smallvec![maybe_carry.take().expect("per construction of batching_deadline").msg]));
}
msg = pgb.read_message() => { msg }
};
let msg_start = Instant::now();
// Rest of this loop body is trying to batch `msg` into `batch`.
// If we can add msg to batch we continue into the next loop iteration.
// If we can't add msg to batch batch, we carry `msg` over to the next call.
let copy_data_bytes = match msg? {
Some(FeMessage::CopyData(bytes)) => bytes,
Some(FeMessage::Terminate) => {
return Ok(Some(BatchOrEof::Eof));
return Ok(BatchOrEof::Eof);
}
Some(m) => {
return Err(QueryError::Other(anyhow::anyhow!(
@@ -656,10 +704,11 @@ impl PageServerHandler {
)));
}
None => {
return Ok(Some(BatchOrEof::Eof));
return Ok(BatchOrEof::Eof);
} // client disconnected
};
trace!("query: {copy_data_bytes:?}");
fail::fail_point!("ps::handle-pagerequest-message");
// parse request
@@ -701,11 +750,11 @@ impl PageServerHandler {
span,
error: $error,
};
let batch_and_error = match batch {
Some(b) => smallvec::smallvec![b, error],
let batch_and_error = match maybe_carry.take() {
Some(carry) => smallvec::smallvec![carry.msg, error],
None => smallvec::smallvec![error],
};
Ok(Some(BatchOrEof::Batch(batch_and_error)))
Ok(BatchOrEof::Batch(batch_and_error))
}};
}
@@ -758,26 +807,20 @@ impl PageServerHandler {
}
};
let batch_timeout = match self.server_side_batch_timeout {
Some(value) => value,
None => {
// Batching is not enabled - stop on the first message.
return Ok(Some(BatchOrEof::Batch(smallvec::smallvec![this_msg])));
}
};
// check if we can batch
match (&mut batch, this_msg) {
//
// batch
//
match (maybe_carry.as_mut(), this_msg) {
(None, this_msg) => {
batch = Some(this_msg);
*maybe_carry = Some(Carry { msg: this_msg, started_at: msg_start });
}
(
Some(BatchedFeMessage::GetPage {
Some(Carry { msg: BatchedFeMessage::GetPage {
span: _,
shard: accum_shard,
pages: accum_pages,
pages: ref mut accum_pages,
effective_request_lsn: accum_lsn,
}),
}, started_at: _}),
BatchedFeMessage::GetPage {
span: _,
shard: this_shard,
@@ -787,12 +830,14 @@ impl PageServerHandler {
) if async {
assert_eq!(this_pages.len(), 1);
if accum_pages.len() >= Timeline::MAX_GET_VECTORED_KEYS as usize {
trace!(%accum_lsn, %this_lsn, "stopping batching because of batch size");
assert_eq!(accum_pages.len(), Timeline::MAX_GET_VECTORED_KEYS as usize);
return false;
}
if (accum_shard.tenant_shard_id, accum_shard.timeline_id)
!= (this_shard.tenant_shard_id, this_shard.timeline_id)
{
trace!(%accum_lsn, %this_lsn, "stopping batching because timeline object mismatch");
// TODO: we _could_ batch & execute each shard seperately (and in parallel).
// But the current logic for keeping responses in order does not support that.
return false;
@@ -800,6 +845,7 @@ impl PageServerHandler {
// the vectored get currently only supports a single LSN, so, bounce as soon
// as the effective request_lsn changes
if *accum_lsn != this_lsn {
trace!(%accum_lsn, %this_lsn, "stopping batching because LSN changed");
return false;
}
true
@@ -809,21 +855,17 @@ impl PageServerHandler {
// ok to batch
accum_pages.extend(this_pages);
}
(Some(_), this_msg) => {
(Some(carry), this_msg) => {
// by default, don't continue batching
break Some(this_msg);
let carry = std::mem::replace(carry,
Carry {
msg: this_msg,
started_at: msg_start,
});
return Ok(BatchOrEof::Batch(smallvec::smallvec![carry.msg]));
}
}
// batching impl piece
let started_at = batch_started_at.get_or_insert_with(Instant::now);
if started_at.elapsed() > batch_timeout {
break None;
}
};
self.next_batch = next_batch;
Ok(batch.map(|b| BatchOrEof::Batch(smallvec::smallvec![b])))
}
}
/// Pagestream sub-protocol handler.
@@ -861,22 +903,17 @@ impl PageServerHandler {
}
}
// If [`PageServerHandler`] is reused for multiple pagestreams,
// then make sure to not process requests from the previous ones.
self.next_batch = None;
let mut carry: Option<Carry> = None;
loop {
let maybe_batched = self
.read_batch_from_connection(pgb, &tenant_id, &timeline_id, &ctx)
.read_batch_from_connection(pgb, &tenant_id, &timeline_id, &mut carry, &ctx)
.await?;
let batched = match maybe_batched {
Some(BatchOrEof::Batch(b)) => b,
Some(BatchOrEof::Eof) => {
BatchOrEof::Batch(b) => b,
BatchOrEof::Eof => {
break;
}
None => {
continue;
}
};
for batch in batched {
@@ -922,6 +959,7 @@ impl PageServerHandler {
(
{
let npages = pages.len();
trace!(npages, "handling getpage request");
let res = self
.handle_get_page_at_lsn_request_batched(
&shard,

View File

@@ -15,6 +15,7 @@ Some handy pytest flags for local development:
- `-k` selects a test to run
- `--timeout=0` disables our default timeout of 300s (see `setup.cfg`)
- `--preserve-database-files` to skip cleanup
- `--out-dir` to produce a JSON with the recorded test metrics
# What performance tests do we have and how we run them
@@ -36,6 +37,6 @@ All tests run only once. Usually to obtain more consistent performance numbers,
## Results collection
Local test results for main branch, and results of daily performance tests, are stored in a neon project deployed in production environment. There is a Grafana dashboard that visualizes the results. Here is the [dashboard](https://observer.zenith.tech/d/DGKBm9Jnz/perf-test-results?orgId=1). The main problem with it is the unavailability to point at particular commit, though the data for that is available in the database. Needs some tweaking from someone who knows Grafana tricks.
Local test results for main branch, and results of daily performance tests, are stored in a [neon project](https://console.neon.tech/app/projects/withered-sky-69117821) deployed in production environment. There is a Grafana dashboard that visualizes the results. Here is the [dashboard](https://observer.zenith.tech/d/DGKBm9Jnz/perf-test-results?orgId=1). The main problem with it is the unavailability to point at particular commit, though the data for that is available in the database. Needs some tweaking from someone who knows Grafana tricks.
There is also an inconsistency in test naming. Test name should be the same across platforms, and results can be differentiated by the platform field. But currently, platform is sometimes included in test name because of the way how parametrization works in pytest. I.e. there is a platform switch in the dashboard with neon-local-ci and neon-staging variants. I.e. some tests under neon-local-ci value for a platform switch are displayed as `Test test_runner/performance/test_bulk_insert.py::test_bulk_insert[vanilla]` and `Test test_runner/performance/test_bulk_insert.py::test_bulk_insert[neon]` which is highly confusing.

View File

@@ -0,0 +1,198 @@
import dataclasses
import time
from dataclasses import dataclass
from typing import Any, Optional
import pytest
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.utils import humantime_to_ms
TARGET_RUNTIME = 5
@pytest.mark.parametrize(
"tablesize_mib, batch_timeout, target_runtime, effective_io_concurrency, readhead_buffer_size, name",
[
# the next 4 cases demonstrate how not-batchable workloads suffer from batching timeout
(50, None, TARGET_RUNTIME, 1, 128, "not batchable no batching"),
(50, "10us", TARGET_RUNTIME, 1, 128, "not batchable 10us timeout"),
(50, "20us", TARGET_RUNTIME, 1, 128, "not batchable 20us timeout"),
(50, "1ms", TARGET_RUNTIME, 1, 128, "not batchable 1ms timeout"),
# the next 4 cases demonstrate how batchable workloads benefit from batching
(50, None, TARGET_RUNTIME, 100, 128, "batchable no batching"),
(50, "10us", TARGET_RUNTIME, 100, 128, "batchable 10us timeout"),
(50, "20us", TARGET_RUNTIME, 100, 128, "batchable 20us timeout"),
(50, "100us", TARGET_RUNTIME, 100, 128, "batchable 100us timeout"),
(50, "1ms", TARGET_RUNTIME, 100, 128, "batchable 1ms timeout"),
],
)
def test_getpage_merge_smoke(
neon_env_builder: NeonEnvBuilder,
zenbenchmark: NeonBenchmarker,
tablesize_mib: int,
batch_timeout: Optional[str],
target_runtime: int,
effective_io_concurrency: int,
readhead_buffer_size: int,
name: str,
):
"""
Do a bunch of sequential scans and ensure that the pageserver does some merging.
"""
#
# record perf-related parameters as metrics to simplify processing of results
#
params: dict[str, tuple[float | int, dict[str, Any]]] = {}
params.update(
{
"tablesize_mib": (tablesize_mib, {"unit": "MiB"}),
"batch_timeout": (
-1 if batch_timeout is None else 1e3 * humantime_to_ms(batch_timeout),
{"unit": "us"},
),
# target_runtime is just a polite ask to the workload to run for this long
"effective_io_concurrency": (effective_io_concurrency, {}),
"readhead_buffer_size": (readhead_buffer_size, {}),
# name is not a metric
}
)
log.info("params: %s", params)
for param, (value, kwargs) in params.items():
zenbenchmark.record(
param,
metric_value=value,
unit=kwargs.pop("unit", ""),
report=MetricReport.TEST_PARAM,
**kwargs,
)
#
# Setup
#
env = neon_env_builder.init_start()
ps_http = env.pageserver.http_client()
endpoint = env.endpoints.create_start("main")
conn = endpoint.connect()
cur = conn.cursor()
cur.execute("SET max_parallel_workers_per_gather=0") # disable parallel backends
cur.execute(f"SET effective_io_concurrency={effective_io_concurrency}")
cur.execute(
f"SET neon.readahead_buffer_size={readhead_buffer_size}"
) # this is the current default value, but let's hard-code that
cur.execute("CREATE EXTENSION IF NOT EXISTS neon;")
cur.execute("CREATE EXTENSION IF NOT EXISTS neon_test_utils;")
log.info("Filling the table")
cur.execute("CREATE TABLE t (data char(1000)) with (fillfactor=10)")
tablesize = tablesize_mib * 1024 * 1024
npages = tablesize // (8 * 1024)
cur.execute("INSERT INTO t SELECT generate_series(1, %s)", (npages,))
# TODO: can we force postgres to do sequential scans?
#
# Run the workload, collect `Metrics` before and after, calculate difference, normalize.
#
@dataclass
class Metrics:
time: float
pageserver_getpage_count: float
pageserver_vectored_get_count: float
compute_getpage_count: float
pageserver_cpu_seconds_total: float
def __sub__(self, other: "Metrics") -> "Metrics":
return Metrics(
time=self.time - other.time,
pageserver_getpage_count=self.pageserver_getpage_count
- other.pageserver_getpage_count,
pageserver_vectored_get_count=self.pageserver_vectored_get_count
- other.pageserver_vectored_get_count,
compute_getpage_count=self.compute_getpage_count - other.compute_getpage_count,
pageserver_cpu_seconds_total=self.pageserver_cpu_seconds_total
- other.pageserver_cpu_seconds_total,
)
def normalize(self, by) -> "Metrics":
return Metrics(
time=self.time / by,
pageserver_getpage_count=self.pageserver_getpage_count / by,
pageserver_vectored_get_count=self.pageserver_vectored_get_count / by,
compute_getpage_count=self.compute_getpage_count / by,
pageserver_cpu_seconds_total=self.pageserver_cpu_seconds_total / by,
)
def get_metrics() -> Metrics:
with conn.cursor() as cur:
cur.execute(
"select value from neon_perf_counters where metric='getpage_wait_seconds_count';"
)
compute_getpage_count = cur.fetchall()[0][0]
pageserver_metrics = ps_http.get_metrics()
return Metrics(
time=time.time(),
pageserver_getpage_count=pageserver_metrics.query_one(
"pageserver_smgr_query_seconds_count", {"smgr_query_type": "get_page_at_lsn"}
).value,
pageserver_vectored_get_count=pageserver_metrics.query_one(
"pageserver_get_vectored_seconds_count", {"task_kind": "PageRequestHandler"}
).value,
compute_getpage_count=compute_getpage_count,
pageserver_cpu_seconds_total=pageserver_metrics.query_one(
"libmetrics_process_cpu_seconds_highres"
).value,
)
def workload() -> Metrics:
start = time.time()
iters = 0
while time.time() - start < target_runtime or iters < 2:
log.info("Seqscan %d", iters)
if iters == 1:
# round zero for warming up
before = get_metrics()
cur.execute(
"select clear_buffer_cache()"
) # TODO: what about LFC? doesn't matter right now because LFC isn't enabled by default in tests
cur.execute("select sum(data::bigint) from t")
assert cur.fetchall()[0][0] == npages * (npages + 1) // 2
iters += 1
after = get_metrics()
return (after - before).normalize(iters - 1)
env.pageserver.patch_config_toml_nonrecursive({"server_side_batch_timeout": batch_timeout})
env.pageserver.restart()
metrics = workload()
log.info("Results: %s", metrics)
#
# Sanity-checks on the collected data
#
# assert that getpage counts roughly match between compute and ps
assert metrics.pageserver_getpage_count == pytest.approx(
metrics.compute_getpage_count, rel=0.01
)
#
# Record the results
#
for metric, value in dataclasses.asdict(metrics).items():
zenbenchmark.record(f"counters.{metric}", value, unit="", report=MetricReport.TEST_PARAM)
zenbenchmark.record(
"perfmetric.batching_factor",
metrics.pageserver_getpage_count / metrics.pageserver_vectored_get_count,
unit="",
report=MetricReport.HIGHER_IS_BETTER,
)