Compare commits

...

68 Commits

Author SHA1 Message Date
Christian Schwarz
bec00d17c3 benchmark numbers 2024-04-10 15:57:18 +00:00
Christian Schwarz
6c5c2c80d2 Revert "adjust bench for both sync and async benchmarking"
This reverts commit afd2c1369e.
2024-04-10 15:43:08 +00:00
Christian Schwarz
c9e8ae40dd Revert "benchmark numbers"
This reverts commit 6e0a02de27.
2024-04-09 11:35:47 +00:00
Christian Schwarz
0047481cc8 Revert "Revert "Revert "HACK: restore old impl, make runtime configurable (how to: reconfigure via HTTP, then kill existing walredo procs)"""
This reverts commit bea2e121dd.
2024-04-09 11:35:07 +00:00
Christian Schwarz
feaca7bc1e Revert "fixup: re-apply bring-back of wal_redo_timeout changes after file movements"
This reverts commit f489a10509.
2024-04-09 11:32:56 +00:00
Christian Schwarz
5d12475664 Revert "Revert "Revert "make the default process kind runtime-configurable, and switch to sync"""
This reverts commit 825c0e30e8.
2024-04-09 11:32:47 +00:00
Christian Schwarz
6e0a02de27 benchmark numbers 2024-04-09 10:59:29 +00:00
Christian Schwarz
afd2c1369e adjust bench for both sync and async benchmarking 2024-04-09 11:21:16 +00:00
Christian Schwarz
b44bbd276e HACK: set walredo process kind metric on startup 2024-04-09 10:33:57 +00:00
Christian Schwarz
aad5a672f0 DO NOT MERGE: diable materialized page cache for benchmarking 2024-04-08 15:42:50 +00:00
Christian Schwarz
5565087dba Merge remote-tracking branch 'origin/problame/async-walredo/benchmarking-2024-04-08--1' into problame/async-walredo/benchmarking-2024-04-08--1 2024-04-08 15:40:34 +00:00
Christian Schwarz
825c0e30e8 Revert "Revert "make the default process kind runtime-configurable, and switch to sync""
This reverts commit b72891d28c.
2024-04-08 15:36:34 +00:00
Christian Schwarz
f489a10509 fixup: re-apply bring-back of wal_redo_timeout changes after file movements 2024-04-08 15:26:12 +00:00
Christian Schwarz
bea2e121dd Revert "Revert "HACK: restore old impl, make runtime configurable (how to: reconfigure via HTTP, then kill existing walredo procs)""
This reverts commit c38b3e6ad6.
2024-04-08 15:14:40 +00:00
Christian Schwarz
82e7e4d84a DO NOT MERGE: benchmarking setup 2024-04-08 14:58:41 +00:00
Christian Schwarz
4ef2fb29fa bring back wal_redo_timeout 2024-04-08 14:53:52 +00:00
Christian Schwarz
ffef90f3db Merge remote-tracking branch 'origin/main' into problame/integrate-tokio-epoll-uring/benchmarking/2024-01-31-prs/async-walredo 2024-04-08 14:31:48 +00:00
Christian Schwarz
d8a926618e tokio-test not necessary 2024-04-08 14:26:22 +00:00
Christian Schwarz
c38b3e6ad6 Revert "HACK: restore old impl, make runtime configurable (how to: reconfigure via HTTP, then kill existing walredo procs)"
This reverts commit cca66e5e82.
2024-04-08 14:14:07 +00:00
Christian Schwarz
b72891d28c Revert "make the default process kind runtime-configurable, and switch to sync"
This reverts commit 67a7abc7cf.
2024-04-08 14:08:36 +00:00
Christian Schwarz
5efaddea02 Merge remote-tracking branch 'origin/problame/configurable-one-runtime' into problame/integrate-tokio-epoll-uring/benchmarking/2024-01-31-prs/async-walredo 2024-04-08 14:03:18 +00:00
Christian Schwarz
aa5439cb6e Merge remote-tracking branch 'origin/main' into problame/configurable-one-runtime 2024-04-08 12:24:43 +00:00
Christian Schwarz
dc8e318a42 fix copy-pasta 2024-04-05 17:58:21 +00:00
Christian Schwarz
871a3caca9 change thread name 2024-04-05 17:58:03 +00:00
Christian Schwarz
edd7f69c2d make current_thread mode work
We need to have &'static Runtime, not &'static Handle, because
&'static Handle doesn't drive IO/timers on current_thread RT.
2024-04-05 17:51:04 +00:00
Christian Schwarz
70fb7e3580 metric, useful for rollout / analyzing grafana metrics 2024-04-05 17:34:11 +00:00
Christian Schwarz
6b820bb423 fixup env var value parsing 2024-04-05 16:42:44 +00:00
Christian Schwarz
740efb0ab5 cleanup 2024-04-05 17:22:06 +02:00
Christian Schwarz
5cf45df692 remove env_config::Bool 2024-04-05 17:22:06 +02:00
Christian Schwarz
3779854f12 rename "single runtime" to "one runtime", allow configuring current_thread and multi_thread:$num_workers 2024-04-05 17:22:06 +02:00
Christian Schwarz
dc03f7a44f pageserver: ability to use a single runtime
This PR allows running the pageserver with a single tokio runtime.
2024-04-05 17:22:06 +02:00
Christian Schwarz
43cf9d10d2 env_config improvements 2024-04-05 17:22:06 +02:00
Christian Schwarz
31d4d1e233 env_config from PR #6125 2024-04-05 17:22:06 +02:00
Christian Schwarz
c77ce7cdc3 Merge remote-tracking branch 'origin/main' into problame/integrate-tokio-epoll-uring/benchmarking/2024-01-31-prs/async-walredo 2024-04-03 21:11:27 +02:00
Christian Schwarz
67a7abc7cf make the default process kind runtime-configurable, and switch to sync 2024-04-03 21:06:02 +02:00
Christian Schwarz
cca66e5e82 HACK: restore old impl, make runtime configurable (how to: reconfigure via HTTP, then kill existing walredo procs) 2024-04-03 20:34:33 +02:00
Christian Schwarz
655d3b6468 audit for cancellation-safety 2024-03-21 15:38:45 +00:00
Christian Schwarz
3dfc7de99b use chrono::DateTime for Poisoned errors 2024-03-21 15:31:24 +00:00
Christian Schwarz
3a5994bb43 Merge branch 'main' into problame/integrate-tokio-epoll-uring/benchmarking/2024-01-31-prs/async-walredo 2024-03-21 14:26:05 +00:00
Christian Schwarz
86b0df973a apply review suggestion https://github.com/neondatabase/neon/pull/7190#discussion_r1533884248 2024-03-21 13:23:08 +00:00
Christian Schwarz
e669b6d852 Merge branch 'problame/async-walredo/better-benchmark' into problame/integrate-tokio-epoll-uring/benchmarking/2024-01-31-prs/async-walredo 2024-03-21 13:05:06 +00:00
Christian Schwarz
b2f5b84c2f cargo fmt 2024-03-21 13:00:55 +00:00
Christian Schwarz
a21409b509 measure results 2024-03-21 12:57:21 +00:00
Christian Schwarz
c6a74bd17f Merge branch 'problame/async-walredo/better-benchmark' into problame/integrate-tokio-epoll-uring/benchmarking/2024-01-31-prs/async-walredo 2024-03-21 12:31:05 +00:00
Christian Schwarz
d6c45625e6 update numbers (the yield makes a big difference, who would have thunken) 2024-03-21 12:30:10 +00:00
Christian Schwarz
db3333eecb yield after ever redo execution 2024-03-21 10:53:32 +00:00
Christian Schwarz
15cfa7b6e9 apply review suggestions 2024-03-21 10:53:26 +00:00
Christian Schwarz
8677136bc8 Merge branch 'problame/async-walredo/better-benchmark' into problame/integrate-tokio-epoll-uring/benchmarking/2024-01-31-prs/async-walredo 2024-03-20 18:11:28 +00:00
Christian Schwarz
a37d713f2b Merge branch 'main' into problame/async-walredo/better-benchmark 2024-03-20 18:11:13 +00:00
Christian Schwarz
081af38076 Merge branch 'main' into problame/async-walredo/better-benchmark 2024-03-20 18:06:06 +00:00
Christian Schwarz
929423cf68 add i3en.3xlarge reference numbers 2024-03-20 18:04:14 +00:00
Christian Schwarz
48b22bd057 walredo: better benchmark 2024-03-20 15:01:14 +00:00
Christian Schwarz
80de8567dc Merge branch 'main' into problame/integrate-tokio-epoll-uring/benchmarking/2024-01-31-prs/async-walredo 2024-03-20 14:32:17 +00:00
Christian Schwarz
f038304274 minimize diff 2024-03-20 14:14:14 +00:00
Christian Schwarz
c853c61c11 replace bench_walredo with my impl 2024-03-20 14:14:09 +00:00
Christian Schwarz
f31f2e9e24 finish benchmark impl (switch to criterion) 2024-03-20 14:09:13 +00:00
Christian Schwarz
cd6d9abafb WIP: throughput-oriented walredo benchmark
duplicates some code from bench_walredo.rs

build using `cargo bench --benches walredo_throughput`
then use like so:

target/release/deps/walredo_throughput-38cf92dd3160bcbd  --managers 100 --clients-per-manager 1

(need to figure out harness = false stuff)

yielded some interesting flamegraphs & tracing overhead, fixed it in
this commit; flamegraphs in my notion
2024-03-15 19:20:34 +00:00
Christian Schwarz
0cf5619e4a Merge remote-tracking branch 'origin/main' into problame/integrate-tokio-epoll-uring/benchmarking/2024-01-31-prs/async-walredo
Tricky to merge because I had split up walredo.rs in the meantime.
2024-03-13 14:01:57 +00:00
Christian Schwarz
96413743e4 move poison to utils and document 2024-01-31 16:24:19 +00:00
Christian Schwarz
b1b8ca32c8 working impl 2024-01-31 16:08:40 +00:00
Christian Schwarz
70b37cf88f WIP poison 2024-01-31 15:57:34 +00:00
Christian Schwarz
4160d407fb cfg(testing) still needs io::Write 2024-01-31 13:46:26 +00:00
Christian Schwarz
a29ac8b8a2 clippy (again?) 2024-01-31 13:14:36 +01:00
Christian Schwarz
639ed3cb3c clippy + compile errors 2024-01-31 12:26:27 +01:00
Christian Schwarz
2736f61604 error handling 2024-01-31 12:22:21 +01:00
Christian Schwarz
8012b80a45 some cleanup work 2024-01-31 12:17:05 +01:00
Christian Schwarz
a93be15360 remove wal_redo_timeout 2024-01-31 12:15:18 +01:00
Christian Schwarz
2c1652a02b WIP: async walredo 2024-01-31 12:09:24 +01:00
7 changed files with 256 additions and 137 deletions

View File

@@ -91,6 +91,8 @@ pub mod zstd;
pub mod env;
pub mod poison;
/// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages
///
/// we have several cases:

121
libs/utils/src/poison.rs Normal file
View File

@@ -0,0 +1,121 @@
//! Protect a piece of state from reuse after it is left in an inconsistent state.
//!
//! # Example
//!
//! ```
//! # tokio_test::block_on(async {
//! use utils::poison::Poison;
//! use std::time::Duration;
//!
//! struct State {
//! clean: bool,
//! }
//! let state = tokio::sync::Mutex::new(Poison::new("mystate", State { clean: true }));
//!
//! let mut mutex_guard = state.lock().await;
//! let mut poison_guard = mutex_guard.check_and_arm()?;
//! let state = poison_guard.data_mut();
//! state.clean = false;
//! // If we get cancelled at this await point, subsequent check_and_arm() calls will fail.
//! tokio::time::sleep(Duration::from_secs(10)).await;
//! state.clean = true;
//! poison_guard.disarm();
//! # Ok::<(), utils::poison::Error>(())
//! # });
//! ```
use tracing::warn;
pub struct Poison<T> {
what: &'static str,
state: State,
data: T,
}
#[derive(Clone, Copy)]
enum State {
Clean,
Armed,
Poisoned { at: chrono::DateTime<chrono::Utc> },
}
impl<T> Poison<T> {
/// We log `what` `warning!` level if the [`Guard`] gets dropped without being [`Guard::disarm`]ed.
pub fn new(what: &'static str, data: T) -> Self {
Self {
what,
state: State::Clean,
data,
}
}
/// Check for poisoning and return a [`Guard`] that provides access to the wrapped state.
pub fn check_and_arm(&mut self) -> Result<Guard<T>, Error> {
match self.state {
State::Clean => {
self.state = State::Armed;
Ok(Guard(self))
}
State::Armed => unreachable!("transient state"),
State::Poisoned { at } => Err(Error::Poisoned {
what: self.what,
at,
}),
}
}
}
/// Use [`Self::data`] and [`Self::data_mut`] to access the wrapped state.
/// Once modifications are done, use [`Self::disarm`].
/// If [`Guard`] gets dropped instead of calling [`Self::disarm`], the state is poisoned
/// and subsequent calls to [`Poison::check_and_arm`] will fail with an error.
pub struct Guard<'a, T>(&'a mut Poison<T>);
impl<'a, T> Guard<'a, T> {
pub fn data(&self) -> &T {
&self.0.data
}
pub fn data_mut(&mut self) -> &mut T {
&mut self.0.data
}
pub fn disarm(self) {
match self.0.state {
State::Clean => unreachable!("we set it to Armed in check_and_arm()"),
State::Armed => {
self.0.state = State::Clean;
}
State::Poisoned { at } => {
unreachable!("we fail check_and_arm() if it's in that state: {at}")
}
}
}
}
impl<'a, T> Drop for Guard<'a, T> {
fn drop(&mut self) {
match self.0.state {
State::Clean => {
// set by disarm()
}
State::Armed => {
// still armed => poison it
let at = chrono::Utc::now();
self.0.state = State::Poisoned { at };
warn!(at=?at, "poisoning {}", self.0.what);
}
State::Poisoned { at } => {
unreachable!("we fail check_and_arm() if it's in that state: {at}")
}
}
}
}
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("poisoned at {at}: {what}")]
Poisoned {
what: &'static str,
at: chrono::DateTime<chrono::Utc>,
},
}

View File

@@ -27,25 +27,25 @@
//!
//! # Reference Numbers
//!
//! 2024-04-04 on i3en.3xlarge
//! 2024-04-10 on i3en.3xlarge
//!
//! ```text
//! short/1 time: [25.925 µs 26.060 µs 26.209 µs]
//! short/2 time: [31.277 µs 31.483 µs 31.722 µs]
//! short/4 time: [45.496 µs 45.831 µs 46.182 µs]
//! short/8 time: [84.298 µs 84.920 µs 85.566 µs]
//! short/16 time: [185.04 µs 186.41 µs 187.88 µs]
//! short/32 time: [385.01 µs 386.77 µs 388.70 µs]
//! short/64 time: [770.24 µs 773.04 µs 776.04 µs]
//! short/128 time: [1.5017 ms 1.5064 ms 1.5113 ms]
//! medium/1 time: [106.65 µs 107.20 µs 107.85 µs]
//! medium/2 time: [153.28 µs 154.24 µs 155.56 µs]
//! medium/4 time: [325.67 µs 327.01 µs 328.71 µs]
//! medium/8 time: [646.82 µs 650.17 µs 653.91 µs]
//! medium/16 time: [1.2645 ms 1.2701 ms 1.2762 ms]
//! medium/32 time: [2.4409 ms 2.4550 ms 2.4692 ms]
//! medium/64 time: [4.6814 ms 4.7114 ms 4.7408 ms]
//! medium/128 time: [8.7790 ms 8.9037 ms 9.0282 ms]
//! short/1 time: [25.197 µs 25.322 µs 25.471 µs]
//! short/2 time: [34.661 µs 34.810 µs 34.994 µs]
//! short/4 time: [45.369 µs 45.686 µs 46.036 µs]
//! short/8 time: [76.653 µs 77.234 µs 77.901 µs]
//! short/16 time: [140.61 µs 142.45 µs 144.33 µs]
//! short/32 time: [275.59 µs 277.42 µs 279.32 µs]
//! short/64 time: [545.53 µs 547.73 µs 550.05 µs]
//! short/128 time: [1.0531 ms 1.0606 ms 1.0693 ms]
//! medium/1 time: [114.98 µs 115.58 µs 116.29 µs]
//! medium/2 time: [159.51 µs 160.12 µs 160.89 µs]
//! medium/4 time: [330.33 µs 334.07 µs 338.96 µs]
//! medium/8 time: [660.55 µs 666.29 µs 673.20 µs]
//! medium/16 time: [1.3094 ms 1.3210 ms 1.3347 ms]
//! medium/32 time: [2.5856 ms 2.6100 ms 2.6387 ms]
//! medium/64 time: [4.9600 ms 5.0052 ms 5.0545 ms]
//! medium/128 time: [9.2382 ms 9.4334 ms 9.6216 ms]
//! ```
use bytes::{Buf, Bytes};
@@ -126,7 +126,7 @@ fn bench_impl(redo_work: Arc<Request>, n_redos: u64, nclients: u64) -> Duration
}
rt.block_on(async move {
let mut total_wallclock_time = std::time::Duration::from_millis(0);
let mut total_wallclock_time = Duration::ZERO;
while let Some(res) = tasks.join_next().await {
total_wallclock_time += res.unwrap();
}

View File

@@ -3098,15 +3098,7 @@ impl Timeline {
lsn: Lsn,
ctx: &RequestContext,
) -> Option<(Lsn, Bytes)> {
let cache = page_cache::get();
// FIXME: It's pointless to check the cache for things that are not 8kB pages.
// We should look at the key to determine if it's a cacheable object
let (lsn, read_guard) = cache
.lookup_materialized_page(self.tenant_shard_id, self.timeline_id, key, lsn, ctx)
.await?;
let img = Bytes::from(read_guard.to_vec());
Some((lsn, img))
return None;
}
async fn get_ready_ancestor_timeline(

View File

@@ -239,6 +239,7 @@ impl PostgresRedoManager {
// Relational WAL records are applied using wal-redo-postgres
let result = proc
.apply_wal_records(rel, blknum, &base_img, records, wal_redo_timeout)
.await
.context("apply_wal_records");
let duration = started_at.elapsed();

View File

@@ -6,21 +6,18 @@ use crate::{
};
use anyhow::Context;
use bytes::Bytes;
use nix::poll::{PollFd, PollFlags};
use pageserver_api::{reltag::RelTag, shard::TenantShardId};
use postgres_ffi::BLCKSZ;
use std::os::fd::AsRawFd;
#[cfg(feature = "testing")]
use std::sync::atomic::AtomicUsize;
use std::{
collections::VecDeque,
io::{Read, Write},
process::{ChildStdin, ChildStdout, Command, Stdio},
sync::{Mutex, MutexGuard},
process::{Command, Stdio},
time::Duration,
};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tracing::{debug, error, instrument, Instrument};
use utils::{lsn::Lsn, nonblock::set_nonblock};
use utils::{lsn::Lsn, poison::Poison};
mod no_leak_child;
/// The IPC protocol that pageserver and walredo process speak over their shared pipe.
@@ -32,20 +29,20 @@ pub struct WalRedoProcess {
tenant_shard_id: TenantShardId,
// Some() on construction, only becomes None on Drop.
child: Option<NoLeakChild>,
stdout: Mutex<ProcessOutput>,
stdin: Mutex<ProcessInput>,
stdout: tokio::sync::Mutex<Poison<ProcessOutput>>,
stdin: tokio::sync::Mutex<Poison<ProcessInput>>,
/// Counter to separate same sized walredo inputs failing at the same millisecond.
#[cfg(feature = "testing")]
dump_sequence: AtomicUsize,
}
struct ProcessInput {
stdin: ChildStdin,
stdin: tokio::process::ChildStdin,
n_requests: usize,
}
struct ProcessOutput {
stdout: ChildStdout,
stdout: tokio::process::ChildStdout,
pending_responses: VecDeque<Option<Bytes>>,
n_processed_responses: usize,
}
@@ -100,17 +97,10 @@ impl WalRedoProcess {
let stderr = child.stderr.take().unwrap();
let stderr = tokio::process::ChildStderr::from_std(stderr)
.context("convert to tokio::ChildStderr")?;
macro_rules! set_nonblock_or_log_err {
($file:ident) => {{
let res = set_nonblock($file.as_raw_fd());
if let Err(e) = &res {
error!(error = %e, file = stringify!($file), pid = child.id(), "set_nonblock failed");
}
res
}};
}
set_nonblock_or_log_err!(stdin)?;
set_nonblock_or_log_err!(stdout)?;
let stdin =
tokio::process::ChildStdin::from_std(stdin).context("convert to tokio::ChildStdin")?;
let stdout = tokio::process::ChildStdout::from_std(stdout)
.context("convert to tokio::ChildStdout")?;
// all fallible operations post-spawn are complete, so get rid of the guard
let child = scopeguard::ScopeGuard::into_inner(child);
@@ -155,15 +145,21 @@ impl WalRedoProcess {
conf,
tenant_shard_id,
child: Some(child),
stdin: Mutex::new(ProcessInput {
stdin,
n_requests: 0,
}),
stdout: Mutex::new(ProcessOutput {
stdout,
pending_responses: VecDeque::new(),
n_processed_responses: 0,
}),
stdin: tokio::sync::Mutex::new(Poison::new(
"stdin",
ProcessInput {
stdin,
n_requests: 0,
},
)),
stdout: tokio::sync::Mutex::new(Poison::new(
"stdout",
ProcessOutput {
stdout,
pending_responses: VecDeque::new(),
n_processed_responses: 0,
},
)),
#[cfg(feature = "testing")]
dump_sequence: AtomicUsize::default(),
})
@@ -176,11 +172,14 @@ impl WalRedoProcess {
.id()
}
// Apply given WAL records ('records') over an old page image. Returns
// new page image.
//
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), pid=%self.id()))]
pub(crate) fn apply_wal_records(
/// Apply given WAL records ('records') over an old page image. Returns
/// new page image.
///
/// # Cancel-Safety
///
/// Cancellation safe.
#[instrument(skip_all, level = tracing::Level::DEBUG, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), pid=%self.id()))]
pub(crate) async fn apply_wal_records(
&self,
rel: RelTag,
blknum: u32,
@@ -189,7 +188,6 @@ impl WalRedoProcess {
wal_redo_timeout: Duration,
) -> anyhow::Result<Bytes> {
let tag = protocol::BufferTag { rel, blknum };
let input = self.stdin.lock().unwrap();
// Serialize all the messages to send the WAL redo process first.
//
@@ -219,7 +217,11 @@ impl WalRedoProcess {
protocol::build_get_page_msg(tag, &mut writebuf);
WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);
let res = self.apply_wal_records0(&writebuf, input, wal_redo_timeout);
let Ok(res) =
tokio::time::timeout(wal_redo_timeout, self.apply_wal_records0(&writebuf)).await
else {
anyhow::bail!("WAL redo timed out");
};
if res.is_err() {
// not all of these can be caused by this particular input, however these are so rare
@@ -230,41 +232,27 @@ impl WalRedoProcess {
res
}
fn apply_wal_records0(
&self,
writebuf: &[u8],
input: MutexGuard<ProcessInput>,
wal_redo_timeout: Duration,
) -> anyhow::Result<Bytes> {
let mut proc = { input }; // TODO: remove this legacy rename, but this keep the patch small.
let mut nwrite = 0usize;
while nwrite < writebuf.len() {
let mut stdin_pollfds = [PollFd::new(&proc.stdin, PollFlags::POLLOUT)];
let n = loop {
match nix::poll::poll(&mut stdin_pollfds[..], wal_redo_timeout.as_millis() as i32) {
Err(nix::errno::Errno::EINTR) => continue,
res => break res,
}
}?;
if n == 0 {
anyhow::bail!("WAL redo timed out");
}
// If 'stdin' is writeable, do write.
let in_revents = stdin_pollfds[0].revents().unwrap();
if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() {
nwrite += proc.stdin.write(&writebuf[nwrite..])?;
}
if in_revents.contains(PollFlags::POLLHUP) {
// We still have more data to write, but the process closed the pipe.
anyhow::bail!("WAL redo process closed its stdin unexpectedly");
}
}
let request_no = proc.n_requests;
proc.n_requests += 1;
drop(proc);
/// # Cancel-Safety
///
/// When not polled to completion (e.g. because in `tokio::select!` another
/// branch becomes ready before this future), concurrent and subsequent
/// calls may fail due to [`utils::poison::Poison::check_and_arm`] calls.
/// Dispose of this process instance and create a new one.
async fn apply_wal_records0(&self, writebuf: &[u8]) -> anyhow::Result<Bytes> {
let request_no = {
let mut lock_guard = self.stdin.lock().await;
let mut poison_guard = lock_guard.check_and_arm()?;
let input = poison_guard.data_mut();
input
.stdin
.write_all(writebuf)
.await
.context("write to walredo stdin")?;
let request_no = input.n_requests;
input.n_requests += 1;
poison_guard.disarm();
request_no
};
// To improve walredo performance we separate sending requests and receiving
// responses. Them are protected by different mutexes (output and input).
@@ -278,40 +266,19 @@ impl WalRedoProcess {
// pending responses ring buffer and truncate all empty elements from the front,
// advancing processed responses number.
let mut output = self.stdout.lock().unwrap();
let mut lock_guard = self.stdout.lock().await;
let mut poison_guard = lock_guard.check_and_arm()?;
let output = poison_guard.data_mut();
let n_processed_responses = output.n_processed_responses;
while n_processed_responses + output.pending_responses.len() <= request_no {
// We expect the WAL redo process to respond with an 8k page image. We read it
// into this buffer.
let mut resultbuf = vec![0; BLCKSZ.into()];
let mut nresult: usize = 0; // # of bytes read into 'resultbuf' so far
while nresult < BLCKSZ.into() {
let mut stdout_pollfds = [PollFd::new(&output.stdout, PollFlags::POLLIN)];
// We do two things simultaneously: reading response from stdout
// and forward any logging information that the child writes to its stderr to the page server's log.
let n = loop {
match nix::poll::poll(
&mut stdout_pollfds[..],
wal_redo_timeout.as_millis() as i32,
) {
Err(nix::errno::Errno::EINTR) => continue,
res => break res,
}
}?;
if n == 0 {
anyhow::bail!("WAL redo timed out");
}
// If we have some data in stdout, read it to the result buffer.
let out_revents = stdout_pollfds[0].revents().unwrap();
if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
nresult += output.stdout.read(&mut resultbuf[nresult..])?;
}
if out_revents.contains(PollFlags::POLLHUP) {
anyhow::bail!("WAL redo process closed its stdout unexpectedly");
}
}
output
.stdout
.read_exact(&mut resultbuf)
.await
.context("read walredo stdout")?;
output
.pending_responses
.push_back(Some(Bytes::from(resultbuf)));
@@ -359,6 +326,7 @@ impl WalRedoProcess {
break;
}
}
poison_guard.disarm();
Ok(res)
}
@@ -378,6 +346,7 @@ impl WalRedoProcess {
let path = self.conf.tenant_path(&self.tenant_shard_id).join(&filename);
use std::io::Write;
let res = std::fs::OpenOptions::new()
.write(true)
.create_new(true)

View File

@@ -8,6 +8,7 @@ from fixtures.neon_fixtures import (
NeonEnvBuilder,
PgBin,
last_flush_lsn_upload,
wait_for_last_flush_lsn,
)
from performance.pageserver.util import ensure_pageserver_ready_for_benchmarking
@@ -27,10 +28,16 @@ def test_many_small_tenants(
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
):
_env = setup_env(neon_env_builder, 2) # vary this to the desired number of tenants
_pg_bin = pg_bin
# drop into pdb so that we can debug pageserver interactively, use pdb here
page_cache_size = int(40 * (1024**3) / 8192)
max_file_descriptors = 500000
neon_env_builder.pageserver_config_override = (
f"page_cache_size={page_cache_size}; max_file_descriptors={max_file_descriptors}"
)
_env = setup_env(neon_env_builder, 3000, pg_bin) # vary this to the desired number of tenants
# drop into pdb so that we can debug pageserver interactively, use pdb here
# For example, to interactively examine pageserver startup behavior, call
# _env.pageserver.stop(immediate=True)
# _env.pageserver.start()
@@ -41,6 +48,7 @@ def test_many_small_tenants(
def setup_env(
neon_env_builder: NeonEnvBuilder,
n_tenants: int,
pg_bin: PgBin,
) -> NeonEnv:
def setup_template(env: NeonEnv):
# create our template tenant
@@ -60,12 +68,38 @@ def setup_env(
".*Dropped remote consistent LSN updates.*",
)
env.pageserver.tenant_attach(template_tenant, config)
ep = env.endpoints.create_start("main", tenant_id=template_tenant)
ep.safe_psql("create table foo(b text)")
for _ in range(0, 8):
ep.safe_psql("insert into foo(b) values ('some text')")
last_flush_lsn_upload(env, ep, template_tenant, template_timeline)
ep.stop_and_destroy()
ps_http = env.pageserver.http_client()
scale = 10
with env.endpoints.create_start("main", tenant_id=template_tenant) as ep:
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", "-I", "dtGvp", ep.connstr()])
wait_for_last_flush_lsn(env, ep, template_tenant, template_timeline)
ps_http.timeline_checkpoint(template_tenant, template_timeline)
ps_http.timeline_compact(template_tenant, template_timeline)
for _ in range(
0, 17
): # some prime number to avoid potential resonances with the "_threshold" variables from the config
# the L0s produced by this appear to have size ~5MiB
num_txns = 10_000
pg_bin.run_capture(
["pgbench", "-N", "-c1", "--transactions", f"{num_txns}", ep.connstr()]
)
wait_for_last_flush_lsn(env, ep, template_tenant, template_timeline)
ps_http.timeline_checkpoint(template_tenant, template_timeline)
ps_http.timeline_compact(template_tenant, template_timeline)
# for reference, the output at scale=6 looked like so (306M total)
# ls -sh test_output/shared-snapshots/max_throughput_latest_lsn-2-6/snapshot/pageserver_1/tenants/35c30b88ea16a7a09f82d9c6a115551b/timelines/da902b378eebe83dc8a4e81cd3dc1c59
# total 306M
# 188M 000000000000000000000000000000000000-030000000000000000000000000000000003__000000000149F060-0000000009E75829
# 4.5M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000009E75829-000000000A21E919
# 33M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000A21E919-000000000C20CB71
# 36M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000C20CB71-000000000E470791
# 16M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000E470791-000000000F34AEF1
# 8.2M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000F34AEF1-000000000FABA8A9
# 6.0M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000FABA8A9-000000000FFE0639
# 6.1M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000FFE0639-000000001051D799
# 4.7M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000001051D799-0000000010908F19
# 4.6M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000010908F19-0000000010CD3021
ep.stop_and_destroy()
return (template_tenant, template_timeline, config)
def doit(neon_env_builder: NeonEnvBuilder) -> NeonEnv: