mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-21 04:12:55 +00:00
Compare commits
40 Commits
conrad/pro
...
problame/b
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
09e7485004 | ||
|
|
058b35f884 | ||
|
|
ff0aa152f1 | ||
|
|
3375f28990 | ||
|
|
e82deb2ccc | ||
|
|
fa7ce2ca07 | ||
|
|
89b6cb8eba | ||
|
|
c68661dfb3 | ||
|
|
517dda849f | ||
|
|
f22ad868cf | ||
|
|
fcda7a72c6 | ||
|
|
469ce810fc | ||
|
|
21866faa8a | ||
|
|
cbb5817997 | ||
|
|
5f3e6f398c | ||
|
|
721643beed | ||
|
|
68550f0f50 | ||
|
|
c73e9e40e9 | ||
|
|
7be13bc5a6 | ||
|
|
689788cbba | ||
|
|
f9bf038d2c | ||
|
|
12124b28d0 | ||
|
|
1d85bec0ea | ||
|
|
81d99704ee | ||
|
|
f3ed5692ea | ||
|
|
1639b26002 | ||
|
|
af95320a8c | ||
|
|
b299eb19e2 | ||
|
|
88d52b31b7 | ||
|
|
aa695b2ad7 | ||
|
|
b695907752 | ||
|
|
75041cb61b | ||
|
|
e80ce970f7 | ||
|
|
f2de5b504f | ||
|
|
b9746168ff | ||
|
|
5cc0059088 | ||
|
|
911946a3cd | ||
|
|
61ff84a3a2 | ||
|
|
15e21c714b | ||
|
|
0689965282 |
20
Cargo.lock
generated
20
Cargo.lock
generated
@@ -244,6 +244,19 @@ dependencies = [
|
||||
"syn 2.0.52",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-timer"
|
||||
version = "1.0.0-beta.15"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1d420af8e042475e58a20d91af8eda7d6528989418c03f3f527e1c3415696f70"
|
||||
dependencies = [
|
||||
"error-code",
|
||||
"libc",
|
||||
"tokio",
|
||||
"wasm-bindgen",
|
||||
"web-time",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-trait"
|
||||
version = "0.1.68"
|
||||
@@ -1920,6 +1933,12 @@ dependencies = [
|
||||
"windows-sys 0.52.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "error-code"
|
||||
version = "3.3.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a5d9305ccc6942a704f4335694ecd3de2ea531b114ac2d51f5f843750787a92f"
|
||||
|
||||
[[package]]
|
||||
name = "event-listener"
|
||||
version = "2.5.3"
|
||||
@@ -3590,6 +3609,7 @@ dependencies = [
|
||||
"arc-swap",
|
||||
"async-compression",
|
||||
"async-stream",
|
||||
"async-timer",
|
||||
"bit_field",
|
||||
"byteorder",
|
||||
"bytes",
|
||||
|
||||
@@ -47,6 +47,7 @@ anyhow = { version = "1.0", features = ["backtrace"] }
|
||||
arc-swap = "1.6"
|
||||
async-compression = { version = "0.4.0", features = ["tokio", "gzip", "zstd"] }
|
||||
atomic-take = "1.1.0"
|
||||
async-timer = { version= "1.0.0-beta.15", features = ["tokio1"] }
|
||||
azure_core = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls", "hmac_rust"] }
|
||||
azure_identity = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls"] }
|
||||
azure_storage = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls"] }
|
||||
|
||||
@@ -2,14 +2,28 @@
|
||||
|
||||
// This module has heavy inspiration from the prometheus crate's `process_collector.rs`.
|
||||
|
||||
use once_cell::sync::Lazy;
|
||||
use prometheus::Gauge;
|
||||
|
||||
use crate::UIntGauge;
|
||||
|
||||
pub struct Collector {
|
||||
descs: Vec<prometheus::core::Desc>,
|
||||
vmlck: crate::UIntGauge,
|
||||
cpu_seconds_highres: Gauge,
|
||||
}
|
||||
|
||||
const NMETRICS: usize = 1;
|
||||
const NMETRICS: usize = 2;
|
||||
|
||||
static CLK_TCK_F64: Lazy<f64> = Lazy::new(|| {
|
||||
let long = unsafe { libc::sysconf(libc::_SC_CLK_TCK) };
|
||||
if long == -1 {
|
||||
panic!("sysconf(_SC_CLK_TCK) failed");
|
||||
}
|
||||
let convertible_to_f64: i32 =
|
||||
i32::try_from(long).expect("sysconf(_SC_CLK_TCK) is larger than i32");
|
||||
convertible_to_f64 as f64
|
||||
});
|
||||
|
||||
impl prometheus::core::Collector for Collector {
|
||||
fn desc(&self) -> Vec<&prometheus::core::Desc> {
|
||||
@@ -27,6 +41,12 @@ impl prometheus::core::Collector for Collector {
|
||||
mfs.extend(self.vmlck.collect())
|
||||
}
|
||||
}
|
||||
if let Ok(stat) = myself.stat() {
|
||||
let cpu_seconds = stat.utime + stat.stime;
|
||||
self.cpu_seconds_highres
|
||||
.set(cpu_seconds as f64 / *CLK_TCK_F64);
|
||||
mfs.extend(self.cpu_seconds_highres.collect());
|
||||
}
|
||||
mfs
|
||||
}
|
||||
}
|
||||
@@ -43,7 +63,23 @@ impl Collector {
|
||||
.cloned(),
|
||||
);
|
||||
|
||||
Self { descs, vmlck }
|
||||
let cpu_seconds_highres = Gauge::new(
|
||||
"libmetrics_process_cpu_seconds_highres",
|
||||
"Total user and system CPU time spent in seconds.\
|
||||
Sub-second resolution, hence better than `process_cpu_seconds_total`.",
|
||||
)
|
||||
.unwrap();
|
||||
descs.extend(
|
||||
prometheus::core::Collector::desc(&cpu_seconds_highres)
|
||||
.into_iter()
|
||||
.cloned(),
|
||||
);
|
||||
|
||||
Self {
|
||||
descs,
|
||||
vmlck,
|
||||
cpu_seconds_highres,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@ anyhow.workspace = true
|
||||
arc-swap.workspace = true
|
||||
async-compression.workspace = true
|
||||
async-stream.workspace = true
|
||||
async-timer.workspace = true
|
||||
bit_field.workspace = true
|
||||
byteorder.workspace = true
|
||||
bytes.workspace = true
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
|
||||
use anyhow::{bail, Context};
|
||||
use async_compression::tokio::write::GzipEncoder;
|
||||
use async_timer::Timer;
|
||||
use bytes::Buf;
|
||||
use futures::FutureExt;
|
||||
use itertools::Itertools;
|
||||
@@ -22,6 +23,7 @@ use pq_proto::FeStartupPacket;
|
||||
use pq_proto::{BeMessage, FeMessage, RowDescriptor};
|
||||
use std::borrow::Cow;
|
||||
use std::io;
|
||||
use std::pin::Pin;
|
||||
use std::str;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
@@ -314,11 +316,15 @@ struct PageServerHandler {
|
||||
|
||||
timeline_handles: TimelineHandles,
|
||||
|
||||
/// Messages queued up for the next processing batch
|
||||
next_batch: Option<BatchedFeMessage>,
|
||||
|
||||
/// See [`PageServerConf::server_side_batch_timeout`]
|
||||
server_side_batch_timeout: Option<Duration>,
|
||||
|
||||
server_side_batch_timer: Pin<Box<async_timer::timer::Platform>>,
|
||||
}
|
||||
|
||||
struct Carry {
|
||||
msg: BatchedFeMessage,
|
||||
started_at: Instant,
|
||||
}
|
||||
|
||||
struct TimelineHandles {
|
||||
@@ -582,8 +588,8 @@ impl PageServerHandler {
|
||||
connection_ctx,
|
||||
timeline_handles: TimelineHandles::new(tenant_manager),
|
||||
cancel,
|
||||
next_batch: None,
|
||||
server_side_batch_timeout,
|
||||
server_side_batch_timer: Box::pin(async_timer::new_timer(Duration::from_secs(999))), // reset each iteration
|
||||
}
|
||||
}
|
||||
|
||||
@@ -611,44 +617,86 @@ impl PageServerHandler {
|
||||
)
|
||||
}
|
||||
|
||||
#[instrument(skip_all, level = tracing::Level::TRACE)]
|
||||
async fn read_batch_from_connection<IO>(
|
||||
&mut self,
|
||||
pgb: &mut PostgresBackend<IO>,
|
||||
tenant_id: &TenantId,
|
||||
timeline_id: &TimelineId,
|
||||
maybe_carry: &mut Option<Carry>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Option<BatchOrEof>, QueryError>
|
||||
) -> Result<BatchOrEof, QueryError>
|
||||
where
|
||||
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
|
||||
{
|
||||
let mut batch = self.next_batch.take();
|
||||
let mut batch_started_at: Option<std::time::Instant> = None;
|
||||
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
|
||||
|
||||
let next_batch: Option<BatchedFeMessage> = loop {
|
||||
let sleep_fut = match (self.server_side_batch_timeout, batch_started_at) {
|
||||
(Some(batch_timeout), Some(started_at)) => futures::future::Either::Left(
|
||||
tokio::time::sleep_until((started_at + batch_timeout).into()),
|
||||
),
|
||||
_ => futures::future::Either::Right(futures::future::pending()),
|
||||
let mut batching_deadline_storage = None; // TODO: can this just be an unsync once_cell?
|
||||
|
||||
loop {
|
||||
// Create a future that will become ready when we need to stop batching.
|
||||
use futures::future::Either;
|
||||
let batching_deadline = match (
|
||||
&*maybe_carry as &Option<Carry>,
|
||||
&mut batching_deadline_storage,
|
||||
) {
|
||||
(None, None) => Either::Left(futures::future::pending()), // there's no deadline before we have something batched
|
||||
(None, Some(_)) => unreachable!(),
|
||||
(Some(_), Some(fut)) => Either::Right(fut), // below arm already ran
|
||||
(Some(carry), None) => {
|
||||
match self.server_side_batch_timeout {
|
||||
None => {
|
||||
return Ok(BatchOrEof::Batch(smallvec::smallvec![
|
||||
maybe_carry
|
||||
.take()
|
||||
.expect("we already checked it's Some")
|
||||
.msg
|
||||
]))
|
||||
}
|
||||
Some(batch_timeout) => {
|
||||
// Take into consideration the time the carry spent waiting.
|
||||
let batch_timeout =
|
||||
batch_timeout.saturating_sub(carry.started_at.elapsed());
|
||||
if batch_timeout.is_zero() {
|
||||
// the timer doesn't support restarting with zero duration
|
||||
return Ok(BatchOrEof::Batch(smallvec::smallvec![
|
||||
maybe_carry
|
||||
.take()
|
||||
.expect("we already checked it's Some")
|
||||
.msg
|
||||
]));
|
||||
} else {
|
||||
self.server_side_batch_timer.restart(batch_timeout);
|
||||
batching_deadline_storage = Some(&mut self.server_side_batch_timer);
|
||||
Either::Right(
|
||||
batching_deadline_storage.as_mut().expect("we just set it"),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let msg = tokio::select! {
|
||||
biased;
|
||||
_ = self.cancel.cancelled() => {
|
||||
return Err(QueryError::Shutdown)
|
||||
}
|
||||
msg = pgb.read_message() => {
|
||||
msg
|
||||
}
|
||||
_ = sleep_fut => {
|
||||
assert!(batch.is_some());
|
||||
break None;
|
||||
_ = batching_deadline => {
|
||||
return Ok(BatchOrEof::Batch(smallvec::smallvec![maybe_carry.take().expect("per construction of batching_deadline").msg]));
|
||||
}
|
||||
msg = pgb.read_message() => { msg }
|
||||
};
|
||||
|
||||
let msg_start = Instant::now();
|
||||
|
||||
// Rest of this loop body is trying to batch `msg` into `batch`.
|
||||
// If we can add msg to batch we continue into the next loop iteration.
|
||||
// If we can't add msg to batch batch, we carry `msg` over to the next call.
|
||||
|
||||
let copy_data_bytes = match msg? {
|
||||
Some(FeMessage::CopyData(bytes)) => bytes,
|
||||
Some(FeMessage::Terminate) => {
|
||||
return Ok(Some(BatchOrEof::Eof));
|
||||
return Ok(BatchOrEof::Eof);
|
||||
}
|
||||
Some(m) => {
|
||||
return Err(QueryError::Other(anyhow::anyhow!(
|
||||
@@ -656,10 +704,11 @@ impl PageServerHandler {
|
||||
)));
|
||||
}
|
||||
None => {
|
||||
return Ok(Some(BatchOrEof::Eof));
|
||||
return Ok(BatchOrEof::Eof);
|
||||
} // client disconnected
|
||||
};
|
||||
trace!("query: {copy_data_bytes:?}");
|
||||
|
||||
fail::fail_point!("ps::handle-pagerequest-message");
|
||||
|
||||
// parse request
|
||||
@@ -701,11 +750,11 @@ impl PageServerHandler {
|
||||
span,
|
||||
error: $error,
|
||||
};
|
||||
let batch_and_error = match batch {
|
||||
Some(b) => smallvec::smallvec![b, error],
|
||||
let batch_and_error = match maybe_carry.take() {
|
||||
Some(carry) => smallvec::smallvec![carry.msg, error],
|
||||
None => smallvec::smallvec![error],
|
||||
};
|
||||
Ok(Some(BatchOrEof::Batch(batch_and_error)))
|
||||
Ok(BatchOrEof::Batch(batch_and_error))
|
||||
}};
|
||||
}
|
||||
|
||||
@@ -758,26 +807,20 @@ impl PageServerHandler {
|
||||
}
|
||||
};
|
||||
|
||||
let batch_timeout = match self.server_side_batch_timeout {
|
||||
Some(value) => value,
|
||||
None => {
|
||||
// Batching is not enabled - stop on the first message.
|
||||
return Ok(Some(BatchOrEof::Batch(smallvec::smallvec![this_msg])));
|
||||
}
|
||||
};
|
||||
|
||||
// check if we can batch
|
||||
match (&mut batch, this_msg) {
|
||||
//
|
||||
// batch
|
||||
//
|
||||
match (maybe_carry.as_mut(), this_msg) {
|
||||
(None, this_msg) => {
|
||||
batch = Some(this_msg);
|
||||
*maybe_carry = Some(Carry { msg: this_msg, started_at: msg_start });
|
||||
}
|
||||
(
|
||||
Some(BatchedFeMessage::GetPage {
|
||||
Some(Carry { msg: BatchedFeMessage::GetPage {
|
||||
span: _,
|
||||
shard: accum_shard,
|
||||
pages: accum_pages,
|
||||
pages: ref mut accum_pages,
|
||||
effective_request_lsn: accum_lsn,
|
||||
}),
|
||||
}, started_at: _}),
|
||||
BatchedFeMessage::GetPage {
|
||||
span: _,
|
||||
shard: this_shard,
|
||||
@@ -787,12 +830,14 @@ impl PageServerHandler {
|
||||
) if async {
|
||||
assert_eq!(this_pages.len(), 1);
|
||||
if accum_pages.len() >= Timeline::MAX_GET_VECTORED_KEYS as usize {
|
||||
trace!(%accum_lsn, %this_lsn, "stopping batching because of batch size");
|
||||
assert_eq!(accum_pages.len(), Timeline::MAX_GET_VECTORED_KEYS as usize);
|
||||
return false;
|
||||
}
|
||||
if (accum_shard.tenant_shard_id, accum_shard.timeline_id)
|
||||
!= (this_shard.tenant_shard_id, this_shard.timeline_id)
|
||||
{
|
||||
trace!(%accum_lsn, %this_lsn, "stopping batching because timeline object mismatch");
|
||||
// TODO: we _could_ batch & execute each shard seperately (and in parallel).
|
||||
// But the current logic for keeping responses in order does not support that.
|
||||
return false;
|
||||
@@ -800,6 +845,7 @@ impl PageServerHandler {
|
||||
// the vectored get currently only supports a single LSN, so, bounce as soon
|
||||
// as the effective request_lsn changes
|
||||
if *accum_lsn != this_lsn {
|
||||
trace!(%accum_lsn, %this_lsn, "stopping batching because LSN changed");
|
||||
return false;
|
||||
}
|
||||
true
|
||||
@@ -809,21 +855,17 @@ impl PageServerHandler {
|
||||
// ok to batch
|
||||
accum_pages.extend(this_pages);
|
||||
}
|
||||
(Some(_), this_msg) => {
|
||||
(Some(carry), this_msg) => {
|
||||
// by default, don't continue batching
|
||||
break Some(this_msg);
|
||||
let carry = std::mem::replace(carry,
|
||||
Carry {
|
||||
msg: this_msg,
|
||||
started_at: msg_start,
|
||||
});
|
||||
return Ok(BatchOrEof::Batch(smallvec::smallvec![carry.msg]));
|
||||
}
|
||||
}
|
||||
|
||||
// batching impl piece
|
||||
let started_at = batch_started_at.get_or_insert_with(Instant::now);
|
||||
if started_at.elapsed() > batch_timeout {
|
||||
break None;
|
||||
}
|
||||
};
|
||||
|
||||
self.next_batch = next_batch;
|
||||
Ok(batch.map(|b| BatchOrEof::Batch(smallvec::smallvec![b])))
|
||||
}
|
||||
}
|
||||
|
||||
/// Pagestream sub-protocol handler.
|
||||
@@ -861,22 +903,17 @@ impl PageServerHandler {
|
||||
}
|
||||
}
|
||||
|
||||
// If [`PageServerHandler`] is reused for multiple pagestreams,
|
||||
// then make sure to not process requests from the previous ones.
|
||||
self.next_batch = None;
|
||||
let mut carry: Option<Carry> = None;
|
||||
|
||||
loop {
|
||||
let maybe_batched = self
|
||||
.read_batch_from_connection(pgb, &tenant_id, &timeline_id, &ctx)
|
||||
.read_batch_from_connection(pgb, &tenant_id, &timeline_id, &mut carry, &ctx)
|
||||
.await?;
|
||||
let batched = match maybe_batched {
|
||||
Some(BatchOrEof::Batch(b)) => b,
|
||||
Some(BatchOrEof::Eof) => {
|
||||
BatchOrEof::Batch(b) => b,
|
||||
BatchOrEof::Eof => {
|
||||
break;
|
||||
}
|
||||
None => {
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
for batch in batched {
|
||||
@@ -922,6 +959,7 @@ impl PageServerHandler {
|
||||
(
|
||||
{
|
||||
let npages = pages.len();
|
||||
trace!(npages, "handling getpage request");
|
||||
let res = self
|
||||
.handle_get_page_at_lsn_request_batched(
|
||||
&shard,
|
||||
|
||||
@@ -15,6 +15,7 @@ Some handy pytest flags for local development:
|
||||
- `-k` selects a test to run
|
||||
- `--timeout=0` disables our default timeout of 300s (see `setup.cfg`)
|
||||
- `--preserve-database-files` to skip cleanup
|
||||
- `--out-dir` to produce a JSON with the recorded test metrics
|
||||
|
||||
# What performance tests do we have and how we run them
|
||||
|
||||
@@ -36,6 +37,6 @@ All tests run only once. Usually to obtain more consistent performance numbers,
|
||||
|
||||
## Results collection
|
||||
|
||||
Local test results for main branch, and results of daily performance tests, are stored in a neon project deployed in production environment. There is a Grafana dashboard that visualizes the results. Here is the [dashboard](https://observer.zenith.tech/d/DGKBm9Jnz/perf-test-results?orgId=1). The main problem with it is the unavailability to point at particular commit, though the data for that is available in the database. Needs some tweaking from someone who knows Grafana tricks.
|
||||
Local test results for main branch, and results of daily performance tests, are stored in a [neon project](https://console.neon.tech/app/projects/withered-sky-69117821) deployed in production environment. There is a Grafana dashboard that visualizes the results. Here is the [dashboard](https://observer.zenith.tech/d/DGKBm9Jnz/perf-test-results?orgId=1). The main problem with it is the unavailability to point at particular commit, though the data for that is available in the database. Needs some tweaking from someone who knows Grafana tricks.
|
||||
|
||||
There is also an inconsistency in test naming. Test name should be the same across platforms, and results can be differentiated by the platform field. But currently, platform is sometimes included in test name because of the way how parametrization works in pytest. I.e. there is a platform switch in the dashboard with neon-local-ci and neon-staging variants. I.e. some tests under neon-local-ci value for a platform switch are displayed as `Test test_runner/performance/test_bulk_insert.py::test_bulk_insert[vanilla]` and `Test test_runner/performance/test_bulk_insert.py::test_bulk_insert[neon]` which is highly confusing.
|
||||
|
||||
@@ -0,0 +1,198 @@
|
||||
import dataclasses
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Optional
|
||||
|
||||
import pytest
|
||||
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
from fixtures.utils import humantime_to_ms
|
||||
|
||||
TARGET_RUNTIME = 5
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"tablesize_mib, batch_timeout, target_runtime, effective_io_concurrency, readhead_buffer_size, name",
|
||||
[
|
||||
# the next 4 cases demonstrate how not-batchable workloads suffer from batching timeout
|
||||
(50, None, TARGET_RUNTIME, 1, 128, "not batchable no batching"),
|
||||
(50, "10us", TARGET_RUNTIME, 1, 128, "not batchable 10us timeout"),
|
||||
(50, "20us", TARGET_RUNTIME, 1, 128, "not batchable 20us timeout"),
|
||||
(50, "1ms", TARGET_RUNTIME, 1, 128, "not batchable 1ms timeout"),
|
||||
# the next 4 cases demonstrate how batchable workloads benefit from batching
|
||||
(50, None, TARGET_RUNTIME, 100, 128, "batchable no batching"),
|
||||
(50, "10us", TARGET_RUNTIME, 100, 128, "batchable 10us timeout"),
|
||||
(50, "20us", TARGET_RUNTIME, 100, 128, "batchable 20us timeout"),
|
||||
(50, "100us", TARGET_RUNTIME, 100, 128, "batchable 100us timeout"),
|
||||
(50, "1ms", TARGET_RUNTIME, 100, 128, "batchable 1ms timeout"),
|
||||
],
|
||||
)
|
||||
def test_getpage_merge_smoke(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
zenbenchmark: NeonBenchmarker,
|
||||
tablesize_mib: int,
|
||||
batch_timeout: Optional[str],
|
||||
target_runtime: int,
|
||||
effective_io_concurrency: int,
|
||||
readhead_buffer_size: int,
|
||||
name: str,
|
||||
):
|
||||
"""
|
||||
Do a bunch of sequential scans and ensure that the pageserver does some merging.
|
||||
"""
|
||||
|
||||
#
|
||||
# record perf-related parameters as metrics to simplify processing of results
|
||||
#
|
||||
params: dict[str, tuple[float | int, dict[str, Any]]] = {}
|
||||
|
||||
params.update(
|
||||
{
|
||||
"tablesize_mib": (tablesize_mib, {"unit": "MiB"}),
|
||||
"batch_timeout": (
|
||||
-1 if batch_timeout is None else 1e3 * humantime_to_ms(batch_timeout),
|
||||
{"unit": "us"},
|
||||
),
|
||||
# target_runtime is just a polite ask to the workload to run for this long
|
||||
"effective_io_concurrency": (effective_io_concurrency, {}),
|
||||
"readhead_buffer_size": (readhead_buffer_size, {}),
|
||||
# name is not a metric
|
||||
}
|
||||
)
|
||||
|
||||
log.info("params: %s", params)
|
||||
|
||||
for param, (value, kwargs) in params.items():
|
||||
zenbenchmark.record(
|
||||
param,
|
||||
metric_value=value,
|
||||
unit=kwargs.pop("unit", ""),
|
||||
report=MetricReport.TEST_PARAM,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
#
|
||||
# Setup
|
||||
#
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
ps_http = env.pageserver.http_client()
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
|
||||
cur.execute("SET max_parallel_workers_per_gather=0") # disable parallel backends
|
||||
cur.execute(f"SET effective_io_concurrency={effective_io_concurrency}")
|
||||
cur.execute(
|
||||
f"SET neon.readahead_buffer_size={readhead_buffer_size}"
|
||||
) # this is the current default value, but let's hard-code that
|
||||
|
||||
cur.execute("CREATE EXTENSION IF NOT EXISTS neon;")
|
||||
cur.execute("CREATE EXTENSION IF NOT EXISTS neon_test_utils;")
|
||||
|
||||
log.info("Filling the table")
|
||||
cur.execute("CREATE TABLE t (data char(1000)) with (fillfactor=10)")
|
||||
tablesize = tablesize_mib * 1024 * 1024
|
||||
npages = tablesize // (8 * 1024)
|
||||
cur.execute("INSERT INTO t SELECT generate_series(1, %s)", (npages,))
|
||||
# TODO: can we force postgres to do sequential scans?
|
||||
|
||||
#
|
||||
# Run the workload, collect `Metrics` before and after, calculate difference, normalize.
|
||||
#
|
||||
|
||||
@dataclass
|
||||
class Metrics:
|
||||
time: float
|
||||
pageserver_getpage_count: float
|
||||
pageserver_vectored_get_count: float
|
||||
compute_getpage_count: float
|
||||
pageserver_cpu_seconds_total: float
|
||||
|
||||
def __sub__(self, other: "Metrics") -> "Metrics":
|
||||
return Metrics(
|
||||
time=self.time - other.time,
|
||||
pageserver_getpage_count=self.pageserver_getpage_count
|
||||
- other.pageserver_getpage_count,
|
||||
pageserver_vectored_get_count=self.pageserver_vectored_get_count
|
||||
- other.pageserver_vectored_get_count,
|
||||
compute_getpage_count=self.compute_getpage_count - other.compute_getpage_count,
|
||||
pageserver_cpu_seconds_total=self.pageserver_cpu_seconds_total
|
||||
- other.pageserver_cpu_seconds_total,
|
||||
)
|
||||
|
||||
def normalize(self, by) -> "Metrics":
|
||||
return Metrics(
|
||||
time=self.time / by,
|
||||
pageserver_getpage_count=self.pageserver_getpage_count / by,
|
||||
pageserver_vectored_get_count=self.pageserver_vectored_get_count / by,
|
||||
compute_getpage_count=self.compute_getpage_count / by,
|
||||
pageserver_cpu_seconds_total=self.pageserver_cpu_seconds_total / by,
|
||||
)
|
||||
|
||||
def get_metrics() -> Metrics:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(
|
||||
"select value from neon_perf_counters where metric='getpage_wait_seconds_count';"
|
||||
)
|
||||
compute_getpage_count = cur.fetchall()[0][0]
|
||||
pageserver_metrics = ps_http.get_metrics()
|
||||
return Metrics(
|
||||
time=time.time(),
|
||||
pageserver_getpage_count=pageserver_metrics.query_one(
|
||||
"pageserver_smgr_query_seconds_count", {"smgr_query_type": "get_page_at_lsn"}
|
||||
).value,
|
||||
pageserver_vectored_get_count=pageserver_metrics.query_one(
|
||||
"pageserver_get_vectored_seconds_count", {"task_kind": "PageRequestHandler"}
|
||||
).value,
|
||||
compute_getpage_count=compute_getpage_count,
|
||||
pageserver_cpu_seconds_total=pageserver_metrics.query_one(
|
||||
"libmetrics_process_cpu_seconds_highres"
|
||||
).value,
|
||||
)
|
||||
|
||||
def workload() -> Metrics:
|
||||
start = time.time()
|
||||
iters = 0
|
||||
while time.time() - start < target_runtime or iters < 2:
|
||||
log.info("Seqscan %d", iters)
|
||||
if iters == 1:
|
||||
# round zero for warming up
|
||||
before = get_metrics()
|
||||
cur.execute(
|
||||
"select clear_buffer_cache()"
|
||||
) # TODO: what about LFC? doesn't matter right now because LFC isn't enabled by default in tests
|
||||
cur.execute("select sum(data::bigint) from t")
|
||||
assert cur.fetchall()[0][0] == npages * (npages + 1) // 2
|
||||
iters += 1
|
||||
after = get_metrics()
|
||||
return (after - before).normalize(iters - 1)
|
||||
|
||||
env.pageserver.patch_config_toml_nonrecursive({"server_side_batch_timeout": batch_timeout})
|
||||
env.pageserver.restart()
|
||||
metrics = workload()
|
||||
|
||||
log.info("Results: %s", metrics)
|
||||
|
||||
#
|
||||
# Sanity-checks on the collected data
|
||||
#
|
||||
# assert that getpage counts roughly match between compute and ps
|
||||
assert metrics.pageserver_getpage_count == pytest.approx(
|
||||
metrics.compute_getpage_count, rel=0.01
|
||||
)
|
||||
|
||||
#
|
||||
# Record the results
|
||||
#
|
||||
|
||||
for metric, value in dataclasses.asdict(metrics).items():
|
||||
zenbenchmark.record(f"counters.{metric}", value, unit="", report=MetricReport.TEST_PARAM)
|
||||
|
||||
zenbenchmark.record(
|
||||
"perfmetric.batching_factor",
|
||||
metrics.pageserver_getpage_count / metrics.pageserver_vectored_get_count,
|
||||
unit="",
|
||||
report=MetricReport.HIGHER_IS_BETTER,
|
||||
)
|
||||
Reference in New Issue
Block a user