mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-20 14:40:37 +00:00
Compare commits
7 Commits
sk-basic-b
...
problame/i
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d23ea718ee | ||
|
|
73a7ca38b3 | ||
|
|
7eb1d4cfa6 | ||
|
|
6ebd683327 | ||
|
|
b1ecdfe099 | ||
|
|
82a74d0e77 | ||
|
|
49b43c75e2 |
@@ -66,12 +66,12 @@ impl serde::Serialize for LatencyPercentiles {
|
||||
{
|
||||
use serde::ser::SerializeMap;
|
||||
let mut ser = serializer.serialize_map(Some(LATENCY_PERCENTILES.len()))?;
|
||||
for p in LATENCY_PERCENTILES {
|
||||
for (i, p) in LATENCY_PERCENTILES.iter().enumerate() {
|
||||
ser.serialize_entry(
|
||||
&format!("p{p}"),
|
||||
&format!(
|
||||
"{}",
|
||||
&humantime::format_duration(self.latency_percentiles[0])
|
||||
&humantime::format_duration(self.latency_percentiles[i])
|
||||
),
|
||||
)?;
|
||||
}
|
||||
|
||||
@@ -199,7 +199,7 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
|
||||
// Perhaps we did no work and the walredo process has been idle for some time:
|
||||
// give it a chance to shut down to avoid leaving walredo process running indefinitely.
|
||||
tenant.walredo_mgr.maybe_quiesce(period * 10);
|
||||
tenant.walredo_mgr.maybe_quiesce(period * 10); // TODO: broken with compaction_period 0
|
||||
|
||||
// Sleep
|
||||
if tokio::time::timeout(sleep_duration, cancel.cancelled())
|
||||
|
||||
@@ -21,7 +21,6 @@
|
||||
use anyhow::Context;
|
||||
use byteorder::{ByteOrder, LittleEndian};
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use nix::poll::*;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use serde::Serialize;
|
||||
use std::collections::VecDeque;
|
||||
@@ -31,10 +30,11 @@ use std::ops::{Deref, DerefMut};
|
||||
use std::os::unix::io::AsRawFd;
|
||||
use std::os::unix::prelude::CommandExt;
|
||||
use std::process::Stdio;
|
||||
use std::process::{Child, ChildStdin, ChildStdout, Command};
|
||||
use std::sync::{Arc, Mutex, MutexGuard, RwLock};
|
||||
use std::process::{Child, Command};
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tracing::*;
|
||||
use utils::{bin_ser::BeSer, lsn::Lsn, nonblock::set_nonblock};
|
||||
|
||||
@@ -73,12 +73,12 @@ pub(crate) struct BufferTag {
|
||||
}
|
||||
|
||||
struct ProcessInput {
|
||||
stdin: ChildStdin,
|
||||
stdin: tokio::process::ChildStdin,
|
||||
n_requests: usize,
|
||||
}
|
||||
|
||||
struct ProcessOutput {
|
||||
stdout: ChildStdout,
|
||||
stdout: tokio::process::ChildStdout,
|
||||
pending_responses: VecDeque<Option<Bytes>>,
|
||||
n_processed_responses: usize,
|
||||
}
|
||||
@@ -157,6 +157,7 @@ impl PostgresRedoManager {
|
||||
self.conf.wal_redo_timeout,
|
||||
pg_version,
|
||||
)
|
||||
.await
|
||||
};
|
||||
img = Some(result?);
|
||||
|
||||
@@ -177,6 +178,7 @@ impl PostgresRedoManager {
|
||||
self.conf.wal_redo_timeout,
|
||||
pg_version,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -217,7 +219,7 @@ impl PostgresRedoManager {
|
||||
/// Process one request for WAL redo using wal-redo postgres
|
||||
///
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn apply_batch_postgres(
|
||||
async fn apply_batch_postgres(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
@@ -270,6 +272,7 @@ impl PostgresRedoManager {
|
||||
let buf_tag = BufferTag { rel, blknum };
|
||||
let result = proc
|
||||
.apply_wal_records(buf_tag, &base_img, records, wal_redo_timeout)
|
||||
.await
|
||||
.context("apply_wal_records");
|
||||
|
||||
let duration = started_at.elapsed();
|
||||
@@ -647,8 +650,8 @@ struct WalRedoProcess {
|
||||
tenant_shard_id: TenantShardId,
|
||||
// Some() on construction, only becomes None on Drop.
|
||||
child: Option<NoLeakChild>,
|
||||
stdout: Mutex<ProcessOutput>,
|
||||
stdin: Mutex<ProcessInput>,
|
||||
stdout: tokio::sync::Mutex<ProcessOutput>,
|
||||
stdin: tokio::sync::Mutex<ProcessInput>,
|
||||
/// Counter to separate same sized walredo inputs failing at the same millisecond.
|
||||
#[cfg(feature = "testing")]
|
||||
dump_sequence: AtomicUsize,
|
||||
@@ -754,12 +757,12 @@ impl WalRedoProcess {
|
||||
conf,
|
||||
tenant_shard_id,
|
||||
child: Some(child),
|
||||
stdin: Mutex::new(ProcessInput {
|
||||
stdin,
|
||||
stdin: tokio::sync::Mutex::new(ProcessInput {
|
||||
stdin: tokio::process::ChildStdin::from_std(stdin).unwrap(), // TODO error handling
|
||||
n_requests: 0,
|
||||
}),
|
||||
stdout: Mutex::new(ProcessOutput {
|
||||
stdout,
|
||||
stdout: tokio::sync::Mutex::new(ProcessOutput {
|
||||
stdout: tokio::process::ChildStdout::from_std(stdout).unwrap(), // TODO error handling
|
||||
pending_responses: VecDeque::new(),
|
||||
n_processed_responses: 0,
|
||||
}),
|
||||
@@ -779,15 +782,13 @@ impl WalRedoProcess {
|
||||
// new page image.
|
||||
//
|
||||
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), pid=%self.id()))]
|
||||
fn apply_wal_records(
|
||||
async fn apply_wal_records(
|
||||
&self,
|
||||
tag: BufferTag,
|
||||
base_img: &Option<Bytes>,
|
||||
records: &[(Lsn, NeonWalRecord)],
|
||||
wal_redo_timeout: Duration,
|
||||
) -> anyhow::Result<Bytes> {
|
||||
let input = self.stdin.lock().unwrap();
|
||||
|
||||
// Serialize all the messages to send the WAL redo process first.
|
||||
//
|
||||
// This could be problematic if there are millions of records to replay,
|
||||
@@ -816,7 +817,7 @@ impl WalRedoProcess {
|
||||
build_get_page_msg(tag, &mut writebuf);
|
||||
WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);
|
||||
|
||||
let res = self.apply_wal_records0(&writebuf, input, wal_redo_timeout);
|
||||
let res = self.apply_wal_records0(&writebuf, wal_redo_timeout).await;
|
||||
|
||||
if res.is_err() {
|
||||
// not all of these can be caused by this particular input, however these are so rare
|
||||
@@ -827,38 +828,17 @@ impl WalRedoProcess {
|
||||
res
|
||||
}
|
||||
|
||||
fn apply_wal_records0(
|
||||
async fn apply_wal_records0(
|
||||
&self,
|
||||
writebuf: &[u8],
|
||||
input: MutexGuard<ProcessInput>,
|
||||
wal_redo_timeout: Duration,
|
||||
_wal_redo_timeout: Duration, // TODO respect
|
||||
) -> anyhow::Result<Bytes> {
|
||||
let input = self.stdin.lock().await;
|
||||
|
||||
let mut proc = { input }; // TODO: remove this legacy rename, but this keep the patch small.
|
||||
let mut nwrite = 0usize;
|
||||
|
||||
while nwrite < writebuf.len() {
|
||||
let mut stdin_pollfds = [PollFd::new(&proc.stdin, PollFlags::POLLOUT)];
|
||||
let n = loop {
|
||||
match nix::poll::poll(&mut stdin_pollfds[..], wal_redo_timeout.as_millis() as i32) {
|
||||
Err(nix::errno::Errno::EINTR) => continue,
|
||||
res => break res,
|
||||
}
|
||||
}?;
|
||||
proc.stdin.write_all(writebuf).await.unwrap(); // TODO: bring back timeout & error handling
|
||||
|
||||
if n == 0 {
|
||||
anyhow::bail!("WAL redo timed out");
|
||||
}
|
||||
|
||||
// If 'stdin' is writeable, do write.
|
||||
let in_revents = stdin_pollfds[0].revents().unwrap();
|
||||
if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() {
|
||||
nwrite += proc.stdin.write(&writebuf[nwrite..])?;
|
||||
}
|
||||
if in_revents.contains(PollFlags::POLLHUP) {
|
||||
// We still have more data to write, but the process closed the pipe.
|
||||
anyhow::bail!("WAL redo process closed its stdin unexpectedly");
|
||||
}
|
||||
}
|
||||
let request_no = proc.n_requests;
|
||||
proc.n_requests += 1;
|
||||
drop(proc);
|
||||
@@ -875,40 +855,13 @@ impl WalRedoProcess {
|
||||
// pending responses ring buffer and truncate all empty elements from the front,
|
||||
// advancing processed responses number.
|
||||
|
||||
let mut output = self.stdout.lock().unwrap();
|
||||
let mut output = self.stdout.lock().await;
|
||||
let n_processed_responses = output.n_processed_responses;
|
||||
while n_processed_responses + output.pending_responses.len() <= request_no {
|
||||
// We expect the WAL redo process to respond with an 8k page image. We read it
|
||||
// into this buffer.
|
||||
let mut resultbuf = vec![0; BLCKSZ.into()];
|
||||
let mut nresult: usize = 0; // # of bytes read into 'resultbuf' so far
|
||||
while nresult < BLCKSZ.into() {
|
||||
let mut stdout_pollfds = [PollFd::new(&output.stdout, PollFlags::POLLIN)];
|
||||
// We do two things simultaneously: reading response from stdout
|
||||
// and forward any logging information that the child writes to its stderr to the page server's log.
|
||||
let n = loop {
|
||||
match nix::poll::poll(
|
||||
&mut stdout_pollfds[..],
|
||||
wal_redo_timeout.as_millis() as i32,
|
||||
) {
|
||||
Err(nix::errno::Errno::EINTR) => continue,
|
||||
res => break res,
|
||||
}
|
||||
}?;
|
||||
|
||||
if n == 0 {
|
||||
anyhow::bail!("WAL redo timed out");
|
||||
}
|
||||
|
||||
// If we have some data in stdout, read it to the result buffer.
|
||||
let out_revents = stdout_pollfds[0].revents().unwrap();
|
||||
if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
|
||||
nresult += output.stdout.read(&mut resultbuf[nresult..])?;
|
||||
}
|
||||
if out_revents.contains(PollFlags::POLLHUP) {
|
||||
anyhow::bail!("WAL redo process closed its stdout unexpectedly");
|
||||
}
|
||||
}
|
||||
output.stdout.read_exact(&mut resultbuf).await.unwrap();
|
||||
output
|
||||
.pending_responses
|
||||
.push_back(Some(Bytes::from(resultbuf)));
|
||||
|
||||
36
results.txt
Normal file
36
results.txt
Normal file
@@ -0,0 +1,36 @@
|
||||
run on i3en.3xlarge
|
||||
|
||||
admin@ip-172-31-13-23:[~/neon-main]: du -hs /instance_store/test_output/shared-snapshots/max_throughput_latest_lsn-1000-6/snapshot/local_fs_remote_storage/pageserver/tenants
|
||||
225G /instance_store/test_output/shared-snapshots/max_throughput_latest_lsn-1000-6/snapshot/local_fs_remote_storage/pageserver/tenants
|
||||
|
||||
=> ~2.25x main memory
|
||||
|
||||
admin@ip-172-31-13-23:[~/neon-main]: NEON_ENV_BUILDER_USE_OVERLAYFS_FOR_SNAPSHOTS=1 DEFAULT_PG_VERSION=15 BUILD_TYPE=release ./scripts/pytest test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py
|
||||
|
||||
--------------------------------------------------------------------------------- Benchmark results ---------------------------------------------------------------------------------
|
||||
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-std-fs].pageserver_max_throughput_getpage_at_latest_lsn.n_tenants: 1000
|
||||
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-std-fs].pageserver_max_throughput_getpage_at_latest_lsn.pgbench_scale: 6
|
||||
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-std-fs].pageserver_max_throughput_getpage_at_latest_lsn.duration: 30 s
|
||||
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-std-fs].pageserver_max_throughput_getpage_at_latest_lsn.pageserver_config_override.page_cache_size: 134217728 byte
|
||||
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-std-fs].pageserver_max_throughput_getpage_at_latest_lsn.pageserver_config_override.max_file_descriptors: 500000
|
||||
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-std-fs].pageserver_max_throughput_getpage_at_latest_lsn.pageserver_config.override.virtual_file_io_engine: IoEngine.STD_FS
|
||||
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-std-fs].pageserver_max_throughput_getpage_at_latest_lsn.request_count: 2321
|
||||
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-std-fs].pageserver_max_throughput_getpage_at_latest_lsn.latency_mean: 8,785.440 ms
|
||||
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-std-fs].pageserver_max_throughput_getpage_at_latest_lsn.latency_percentiles.p95: 20,234.239 ms
|
||||
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-std-fs].pageserver_max_throughput_getpage_at_latest_lsn.latency_percentiles.p99: 20,234.239 ms
|
||||
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-std-fs].pageserver_max_throughput_getpage_at_latest_lsn.latency_percentiles.p99.9: 20,234.239 ms
|
||||
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-std-fs].pageserver_max_throughput_getpage_at_latest_lsn.latency_percentiles.p99.99: 20,234.239 ms
|
||||
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-tokio-epoll-uring].pageserver_max_throughput_getpage_at_latest_lsn.n_tenants: 1000
|
||||
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-tokio-epoll-uring].pageserver_max_throughput_getpage_at_latest_lsn.pgbench_scale: 6
|
||||
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-tokio-epoll-uring].pageserver_max_throughput_getpage_at_latest_lsn.duration: 30 s
|
||||
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-tokio-epoll-uring].pageserver_max_throughput_getpage_at_latest_lsn.pageserver_config_override.page_cache_size: 134217728 byte
|
||||
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-tokio-epoll-uring].pageserver_max_throughput_getpage_at_latest_lsn.pageserver_config_override.max_file_descriptors: 500000
|
||||
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-tokio-epoll-uring].pageserver_max_throughput_getpage_at_latest_lsn.pageserver_config.override.virtual_file_io_engine: IoEngine.TOKIO_EPOLL_URING
|
||||
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-tokio-epoll-uring].pageserver_max_throughput_getpage_at_latest_lsn.request_count: 2200
|
||||
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-tokio-epoll-uring].pageserver_max_throughput_getpage_at_latest_lsn.latency_mean: 9,046.271 ms
|
||||
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-tokio-epoll-uring].pageserver_max_throughput_getpage_at_latest_lsn.latency_percentiles.p95: 16,457.727 ms
|
||||
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-tokio-epoll-uring].pageserver_max_throughput_getpage_at_latest_lsn.latency_percentiles.p99: 16,457.727 ms
|
||||
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-tokio-epoll-uring].pageserver_max_throughput_getpage_at_latest_lsn.latency_percentiles.p99.9: 16,457.727 ms
|
||||
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-tokio-epoll-uring].pageserver_max_throughput_getpage_at_latest_lsn.latency_percentiles.p99.99: 16,457.727 ms
|
||||
|
||||
=========================================================================== 2 passed in 142.33s (0:02:22) ===========================================================================
|
||||
@@ -20,10 +20,10 @@ fi
|
||||
|
||||
# do all the on-disk initialization work now instead of a background kernel thread
|
||||
# so that we're ready for benchmarking right after this line
|
||||
sudo mkfs.ext4 -E lazy_itable_init=0,lazy_journal_init=0 /dev/nvme1n1
|
||||
#sudo mkfs.ext4 -E lazy_itable_init=0,lazy_journal_init=0 /dev/nvme1n1
|
||||
|
||||
MOUNTPOINT=/instance_store
|
||||
sudo mkdir "$MOUNTPOINT"
|
||||
sudo rmdir "$MOUNTPOINT" || sudo mkdir "$MOUNTPOINT"
|
||||
sudo mount /dev/nvme1n1 "$MOUNTPOINT"
|
||||
sudo chown -R "$(id -u)":"$(id -g)" "$MOUNTPOINT"
|
||||
|
||||
@@ -40,7 +40,7 @@ To run your local neon.git build on the instance store volume,
|
||||
run the following commands from the top of the neon.git checkout
|
||||
|
||||
# raise file descriptor limit of your shell and its child processes
|
||||
sudo prlimit -p $$ --nofile=800000:800000
|
||||
sudo prlimit -p \$\$ --nofile=800000:800000
|
||||
|
||||
# test suite run
|
||||
export TEST_OUTPUT="$TEST_OUTPUT"
|
||||
|
||||
@@ -1,7 +1,10 @@
|
||||
import enum
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Tuple
|
||||
|
||||
import toml
|
||||
|
||||
import fixtures.pageserver.many_tenants as many_tenants
|
||||
import pytest
|
||||
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
|
||||
@@ -17,6 +20,10 @@ from fixtures.utils import get_scale_for_db, humantime_to_ms
|
||||
from performance.pageserver.util import ensure_pageserver_ready_for_benchmarking
|
||||
|
||||
|
||||
class IoEngine(str, enum.Enum):
|
||||
STD_FS = "std-fs"
|
||||
TOKIO_EPOLL_URING = "tokio-epoll-uring"
|
||||
|
||||
# For reference, the space usage of the snapshots:
|
||||
# admin@ip-172-31-13-23:[~/neon-main]: sudo du -hs /instance_store/test_output/shared-snapshots
|
||||
# 137G /instance_store/test_output/shared-snapshots
|
||||
@@ -27,9 +34,10 @@ from performance.pageserver.util import ensure_pageserver_ready_for_benchmarking
|
||||
# 5.1G /instance_store/test_output/shared-snapshots/max_throughput_latest_lsn-10-6
|
||||
# 76G /instance_store/test_output/shared-snapshots/max_throughput_latest_lsn-100-13
|
||||
# 46G /instance_store/test_output/shared-snapshots/max_throughput_latest_lsn-100-6
|
||||
@pytest.mark.parametrize("ioengine", [IoEngine.STD_FS, IoEngine.TOKIO_EPOLL_URING])
|
||||
@pytest.mark.parametrize("duration", [30])
|
||||
@pytest.mark.parametrize("pgbench_scale", [get_scale_for_db(s) for s in [100, 200]])
|
||||
@pytest.mark.parametrize("n_tenants", [1, 10])
|
||||
@pytest.mark.parametrize("pgbench_scale", [get_scale_for_db(s) for s in [100]])
|
||||
@pytest.mark.parametrize("n_tenants", [1000])
|
||||
@pytest.mark.timeout(
|
||||
10000
|
||||
) # TODO: this value is just "a really high number"; have this per instance type
|
||||
@@ -40,6 +48,7 @@ def test_pageserver_max_throughput_getpage_at_latest_lsn(
|
||||
n_tenants: int,
|
||||
pgbench_scale: int,
|
||||
duration: int,
|
||||
ioengine: IoEngine,
|
||||
):
|
||||
def record(metric, **kwargs):
|
||||
zenbenchmark.record(
|
||||
@@ -60,9 +69,12 @@ def test_pageserver_max_throughput_getpage_at_latest_lsn(
|
||||
# configure cache sizes like in prod
|
||||
page_cache_size = 16384
|
||||
max_file_descriptors = 500000
|
||||
neon_env_builder.pageserver_config_override = (
|
||||
f"page_cache_size={page_cache_size}; max_file_descriptors={max_file_descriptors}"
|
||||
)
|
||||
pageserver_config_override = {
|
||||
"page_cache_size": f"{page_cache_size}",
|
||||
"max_file_descriptors": f"{max_file_descriptors}",
|
||||
"virtual_file_io_engine": f"\"{ioengine}\"",
|
||||
}
|
||||
neon_env_builder.pageserver_config_override = ";".join([f"{k}={v}" for k, v in pageserver_config_override.items()])
|
||||
params.update(
|
||||
{
|
||||
"pageserver_config_override.page_cache_size": (
|
||||
@@ -70,12 +82,17 @@ def test_pageserver_max_throughput_getpage_at_latest_lsn(
|
||||
{"unit": "byte"},
|
||||
),
|
||||
"pageserver_config_override.max_file_descriptors": (max_file_descriptors, {"unit": ""}),
|
||||
"pageserver_config.override.virtual_file_io_engine": (ioengine, {"unit": ""}),
|
||||
}
|
||||
)
|
||||
|
||||
for param, (value, kwargs) in params.items():
|
||||
record(param, metric_value=value, report=MetricReport.TEST_PARAM, **kwargs)
|
||||
env = setup_pageserver_with_pgbench_tenants(neon_env_builder, pg_bin, n_tenants, pgbench_scale)
|
||||
ps_http =env.pageserver.http_client()
|
||||
for tenant_info in ps_http.tenant_list():
|
||||
tenant_id = tenant_info["id"]
|
||||
ps_http.patch_tenant_config_client_side(tenant_id, {"compaction_period": "10s"})
|
||||
run_benchmark_max_throughput_latest_lsn(env, pg_bin, record, duration)
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user