Add performance regress test_ondemand_download_churn.py (#7242)

Add performance regress test  for on-demand download throughput.

Closes https://github.com/neondatabase/neon/issues/7146

Co-authored-by: Christian Schwarz <christian@neon.tech>
Co-authored-by: Alexander Bayandin <alexander@neon.tech>
This commit is contained in:
Jure Bajic
2024-05-15 18:41:12 +02:00
committed by GitHub
parent 3ef6e21211
commit affc18f912
3 changed files with 267 additions and 21 deletions

View File

@@ -745,6 +745,16 @@ impl HistoricLayerInfo {
};
*field = value;
}
pub fn layer_file_size(&self) -> u64 {
match self {
HistoricLayerInfo::Delta {
layer_file_size, ..
} => *layer_file_size,
HistoricLayerInfo::Image {
layer_file_size, ..
} => *layer_file_size,
}
}
}
#[derive(Debug, Serialize, Deserialize)]

View File

@@ -2,9 +2,11 @@ use pageserver_api::{models::HistoricLayerInfo, shard::TenantShardId};
use pageserver_client::mgmt_api;
use rand::seq::SliceRandom;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info};
use utils::id::{TenantTimelineId, TimelineId};
use std::{f64, sync::Arc};
use tokio::{
sync::{mpsc, OwnedSemaphorePermit},
task::JoinSet,
@@ -12,10 +14,7 @@ use tokio::{
use std::{
num::NonZeroUsize,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
sync::atomic::{AtomicU64, Ordering},
time::{Duration, Instant},
};
@@ -51,19 +50,31 @@ pub(crate) fn main(args: Args) -> anyhow::Result<()> {
Ok(())
}
#[derive(serde::Serialize)]
struct Output {
downloads_count: u64,
downloads_bytes: u64,
evictions_count: u64,
timeline_restarts: u64,
#[serde(with = "humantime_serde")]
runtime: Duration,
}
#[derive(Debug, Default)]
struct LiveStats {
evictions: AtomicU64,
downloads: AtomicU64,
evictions_count: AtomicU64,
downloads_count: AtomicU64,
downloads_bytes: AtomicU64,
timeline_restarts: AtomicU64,
}
impl LiveStats {
fn eviction_done(&self) {
self.evictions.fetch_add(1, Ordering::Relaxed);
self.evictions_count.fetch_add(1, Ordering::Relaxed);
}
fn download_done(&self) {
self.downloads.fetch_add(1, Ordering::Relaxed);
fn download_done(&self, size: u64) {
self.downloads_count.fetch_add(1, Ordering::Relaxed);
self.downloads_bytes.fetch_add(size, Ordering::Relaxed);
}
fn timeline_restart_done(&self) {
self.timeline_restarts.fetch_add(1, Ordering::Relaxed);
@@ -92,28 +103,49 @@ async fn main_impl(args: Args) -> anyhow::Result<()> {
)
.await?;
let token = CancellationToken::new();
let mut tasks = JoinSet::new();
let live_stats = Arc::new(LiveStats::default());
let periodic_stats = Arc::new(LiveStats::default());
let total_stats = Arc::new(LiveStats::default());
let start = Instant::now();
tasks.spawn({
let live_stats = Arc::clone(&live_stats);
let periodic_stats = Arc::clone(&periodic_stats);
let total_stats = Arc::clone(&total_stats);
let cloned_token = token.clone();
async move {
let mut last_at = Instant::now();
loop {
if cloned_token.is_cancelled() {
return;
}
tokio::time::sleep_until((last_at + Duration::from_secs(1)).into()).await;
let now = Instant::now();
let delta: Duration = now - last_at;
last_at = now;
let LiveStats {
evictions,
downloads,
evictions_count,
downloads_count,
downloads_bytes,
timeline_restarts,
} = &*live_stats;
let evictions = evictions.swap(0, Ordering::Relaxed) as f64 / delta.as_secs_f64();
let downloads = downloads.swap(0, Ordering::Relaxed) as f64 / delta.as_secs_f64();
} = &*periodic_stats;
let evictions_count = evictions_count.swap(0, Ordering::Relaxed);
let downloads_count = downloads_count.swap(0, Ordering::Relaxed);
let downloads_bytes = downloads_bytes.swap(0, Ordering::Relaxed);
let timeline_restarts = timeline_restarts.swap(0, Ordering::Relaxed);
info!("evictions={evictions:.2}/s downloads={downloads:.2}/s timeline_restarts={timeline_restarts}");
total_stats.evictions_count.fetch_add(evictions_count, Ordering::Relaxed);
total_stats.downloads_count.fetch_add(downloads_count, Ordering::Relaxed);
total_stats.downloads_bytes.fetch_add(downloads_bytes, Ordering::Relaxed);
total_stats.timeline_restarts.fetch_add(timeline_restarts, Ordering::Relaxed);
let evictions_per_s = evictions_count as f64 / delta.as_secs_f64();
let downloads_per_s = downloads_count as f64 / delta.as_secs_f64();
let downloads_mibs_per_s = downloads_bytes as f64 / delta.as_secs_f64() / ((1 << 20) as f64);
info!("evictions={evictions_per_s:.2}/s downloads={downloads_per_s:.2}/s download_bytes={downloads_mibs_per_s:.2}MiB/s timeline_restarts={timeline_restarts}");
}
}
});
@@ -124,14 +156,42 @@ async fn main_impl(args: Args) -> anyhow::Result<()> {
args,
Arc::clone(&mgmt_api_client),
tl,
Arc::clone(&live_stats),
Arc::clone(&periodic_stats),
token.clone(),
));
}
}
if let Some(runtime) = args.runtime {
tokio::spawn(async move {
tokio::time::sleep(runtime.into()).await;
token.cancel();
});
}
while let Some(res) = tasks.join_next().await {
res.unwrap();
}
let end = Instant::now();
let duration: Duration = end - start;
let output = {
let LiveStats {
evictions_count,
downloads_count,
downloads_bytes,
timeline_restarts,
} = &*total_stats;
Output {
downloads_count: downloads_count.load(Ordering::Relaxed),
downloads_bytes: downloads_bytes.load(Ordering::Relaxed),
evictions_count: evictions_count.load(Ordering::Relaxed),
timeline_restarts: timeline_restarts.load(Ordering::Relaxed),
runtime: duration,
}
};
let output = serde_json::to_string_pretty(&output).unwrap();
println!("{output}");
Ok(())
}
@@ -140,6 +200,7 @@ async fn timeline_actor(
mgmt_api_client: Arc<pageserver_client::mgmt_api::Client>,
timeline: TenantTimelineId,
live_stats: Arc<LiveStats>,
token: CancellationToken,
) {
// TODO: support sharding
let tenant_shard_id = TenantShardId::unsharded(timeline.tenant_id);
@@ -149,7 +210,7 @@ async fn timeline_actor(
layers: Vec<mpsc::Sender<OwnedSemaphorePermit>>,
concurrency: Arc<tokio::sync::Semaphore>,
}
loop {
while !token.is_cancelled() {
debug!("restarting timeline");
let layer_map_info = mgmt_api_client
.layer_map_info(tenant_shard_id, timeline.timeline_id)
@@ -185,7 +246,7 @@ async fn timeline_actor(
live_stats.timeline_restart_done();
loop {
while !token.is_cancelled() {
assert!(!timeline.joinset.is_empty());
if let Some(res) = timeline.joinset.try_join_next() {
debug!(?res, "a layer actor exited, should not happen");
@@ -255,7 +316,7 @@ async fn layer_actor(
.layer_ondemand_download(tenant_shard_id, timeline_id, layer.layer_file_name())
.await
.unwrap();
live_stats.download_done();
live_stats.download_done(layer.layer_file_size());
did_it
}
};

View File

@@ -0,0 +1,175 @@
import json
from pathlib import Path
from typing import Any, Dict, Tuple
import pytest
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, PgBin, wait_for_last_flush_lsn
from fixtures.pageserver.utils import wait_for_upload_queue_empty
from fixtures.remote_storage import s3_storage
from fixtures.utils import humantime_to_ms
@pytest.mark.parametrize("duration", [30])
@pytest.mark.parametrize("io_engine", ["tokio-epoll-uring", "std-fs"])
@pytest.mark.parametrize("concurrency_per_target", [1, 10, 100])
@pytest.mark.timeout(1000)
def test_download_churn(
neon_env_builder: NeonEnvBuilder,
zenbenchmark: NeonBenchmarker,
pg_bin: PgBin,
io_engine: str,
concurrency_per_target: int,
duration: int,
):
def record(metric, **kwargs):
zenbenchmark.record(metric_name=f"pageserver_ondemand_download_churn.{metric}", **kwargs)
params: Dict[str, Tuple[Any, Dict[str, Any]]] = {}
# params from fixtures
params.update(
{
# we don't capture `duration`, but instead use the `runtime` output field from pagebench
}
)
# configure cache sizes like in prod
page_cache_size = 16384
max_file_descriptors = 500000
neon_env_builder.pageserver_config_override = (
f"page_cache_size={page_cache_size}; max_file_descriptors={max_file_descriptors}"
)
params.update(
{
"pageserver_config_override.page_cache_size": (
page_cache_size * 8192,
{"unit": "byte"},
),
"pageserver_config_override.max_file_descriptors": (max_file_descriptors, {"unit": ""}),
}
)
for param, (value, kwargs) in params.items():
record(param, metric_value=value, report=MetricReport.TEST_PARAM, **kwargs)
# Setup env
env = setup_env(neon_env_builder, pg_bin)
env.pageserver.allowed_errors.append(
f".*path=/v1/tenant/{env.initial_tenant}/timeline.* request was dropped before completing"
)
run_benchmark(env, pg_bin, record, io_engine, concurrency_per_target, duration)
def setup_env(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
remote_storage_kind = s3_storage()
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
# We configure tenant conf such that SQL query below produces a lot of layers.
# We don't care what's in the layers really, we just care that layers are created.
bytes_per_layer = 10 * (1024**2)
env = neon_env_builder.init_start(
initial_tenant_conf={
"pitr_interval": "1000d", # let's not make it get in the way
"gc_period": "0s", # disable periodic gc to avoid noise
"compaction_period": "0s", # disable L0=>L1 compaction
"checkpoint_timeout": "10years", # rely solely on checkpoint_distance
"checkpoint_distance": bytes_per_layer, # 10M instead of 256M to create more smaller layers
"image_creation_threshold": 100000, # don't create image layers ever
}
)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
client = env.pageserver.http_client()
with env.endpoints.create_start("main", tenant_id=tenant_id) as ep:
ep.safe_psql("CREATE TABLE data (random_text text)")
bytes_per_row = 512 # make big enough so WAL record size doesn't dominate
desired_layers = 300
desired_bytes = bytes_per_layer * desired_layers
nrows = desired_bytes / bytes_per_row
ep.safe_psql(
f"INSERT INTO data SELECT lpad(i::text, {bytes_per_row}, '0') FROM generate_series(1, {int(nrows)}) as i",
options="-c statement_timeout=0",
)
wait_for_last_flush_lsn(env, ep, tenant_id, timeline_id)
# TODO: this is a bit imprecise, there could be frozen layers being written out that we don't observe here
wait_for_upload_queue_empty(client, tenant_id, timeline_id)
return env
def run_benchmark(
env: NeonEnv,
pg_bin: PgBin,
record,
io_engine: str,
concurrency_per_target: int,
duration_secs: int,
):
ps_http = env.pageserver.http_client()
cmd = [
str(env.neon_binpath / "pagebench"),
"ondemand-download-churn",
"--mgmt-api-endpoint",
ps_http.base_url,
"--runtime",
f"{duration_secs}s",
"--set-io-engine",
f"{io_engine}",
"--concurrency-per-target",
f"{concurrency_per_target}",
# don't specify the targets explicitly, let pagebench auto-discover them
]
log.info(f"command: {' '.join(cmd)}")
basepath = pg_bin.run_capture(cmd, with_command_header=False)
results_path = Path(basepath + ".stdout")
log.info(f"Benchmark results at: {results_path}")
with open(results_path, "r") as f:
results = json.load(f)
log.info(f"Results:\n{json.dumps(results, sort_keys=True, indent=2)}")
metric = "downloads_count"
record(
metric,
metric_value=results[metric],
unit="",
report=MetricReport.HIGHER_IS_BETTER,
)
metric = "downloads_bytes"
record(
metric,
metric_value=results[metric],
unit="byte",
report=MetricReport.HIGHER_IS_BETTER,
)
metric = "evictions_count"
record(
metric,
metric_value=results[metric],
unit="",
report=MetricReport.HIGHER_IS_BETTER,
)
metric = "timeline_restarts"
record(
metric,
metric_value=results[metric],
unit="",
report=MetricReport.LOWER_IS_BETTER,
)
metric = "runtime"
record(
metric,
metric_value=humantime_to_ms(results[metric]) / 1000,
unit="s",
report=MetricReport.TEST_PARAM,
)