Compare commits

...

37 Commits

Author SHA1 Message Date
Christian Schwarz
7d232f21d2 amend benchmarking setup 2024-03-25 16:46:49 +00:00
Christian Schwarz
48b85f9e0b DO NOT MERGE: diable materialized page cache LOOKUP (not materialization) for benchmarking 2024-03-25 15:03:26 +00:00
Christian Schwarz
5b01c16454 Merge remote-tracking branch 'origin/main' into problame/async-walredo/benchmarking-2024-03-22--1 2024-03-25 14:27:52 +00:00
Christian Schwarz
8265739e91 HACK: restore old impl, make runtime configurable (how to: reconfigure via HTTP, then kill existing walredo procs) 2024-03-22 10:23:53 +00:00
Christian Schwarz
3aa4fc593e DO NOT MERGE: benchmarking setup 2024-03-22 09:53:02 +00:00
Christian Schwarz
655d3b6468 audit for cancellation-safety 2024-03-21 15:38:45 +00:00
Christian Schwarz
3dfc7de99b use chrono::DateTime for Poisoned errors 2024-03-21 15:31:24 +00:00
Christian Schwarz
3a5994bb43 Merge branch 'main' into problame/integrate-tokio-epoll-uring/benchmarking/2024-01-31-prs/async-walredo 2024-03-21 14:26:05 +00:00
Christian Schwarz
86b0df973a apply review suggestion https://github.com/neondatabase/neon/pull/7190#discussion_r1533884248 2024-03-21 13:23:08 +00:00
Christian Schwarz
e669b6d852 Merge branch 'problame/async-walredo/better-benchmark' into problame/integrate-tokio-epoll-uring/benchmarking/2024-01-31-prs/async-walredo 2024-03-21 13:05:06 +00:00
Christian Schwarz
b2f5b84c2f cargo fmt 2024-03-21 13:00:55 +00:00
Christian Schwarz
a21409b509 measure results 2024-03-21 12:57:21 +00:00
Christian Schwarz
c6a74bd17f Merge branch 'problame/async-walredo/better-benchmark' into problame/integrate-tokio-epoll-uring/benchmarking/2024-01-31-prs/async-walredo 2024-03-21 12:31:05 +00:00
Christian Schwarz
d6c45625e6 update numbers (the yield makes a big difference, who would have thunken) 2024-03-21 12:30:10 +00:00
Christian Schwarz
db3333eecb yield after ever redo execution 2024-03-21 10:53:32 +00:00
Christian Schwarz
15cfa7b6e9 apply review suggestions 2024-03-21 10:53:26 +00:00
Christian Schwarz
8677136bc8 Merge branch 'problame/async-walredo/better-benchmark' into problame/integrate-tokio-epoll-uring/benchmarking/2024-01-31-prs/async-walredo 2024-03-20 18:11:28 +00:00
Christian Schwarz
a37d713f2b Merge branch 'main' into problame/async-walredo/better-benchmark 2024-03-20 18:11:13 +00:00
Christian Schwarz
081af38076 Merge branch 'main' into problame/async-walredo/better-benchmark 2024-03-20 18:06:06 +00:00
Christian Schwarz
929423cf68 add i3en.3xlarge reference numbers 2024-03-20 18:04:14 +00:00
Christian Schwarz
48b22bd057 walredo: better benchmark 2024-03-20 15:01:14 +00:00
Christian Schwarz
80de8567dc Merge branch 'main' into problame/integrate-tokio-epoll-uring/benchmarking/2024-01-31-prs/async-walredo 2024-03-20 14:32:17 +00:00
Christian Schwarz
f038304274 minimize diff 2024-03-20 14:14:14 +00:00
Christian Schwarz
c853c61c11 replace bench_walredo with my impl 2024-03-20 14:14:09 +00:00
Christian Schwarz
f31f2e9e24 finish benchmark impl (switch to criterion) 2024-03-20 14:09:13 +00:00
Christian Schwarz
cd6d9abafb WIP: throughput-oriented walredo benchmark
duplicates some code from bench_walredo.rs

build using `cargo bench --benches walredo_throughput`
then use like so:

target/release/deps/walredo_throughput-38cf92dd3160bcbd  --managers 100 --clients-per-manager 1

(need to figure out harness = false stuff)

yielded some interesting flamegraphs & tracing overhead, fixed it in
this commit; flamegraphs in my notion
2024-03-15 19:20:34 +00:00
Christian Schwarz
0cf5619e4a Merge remote-tracking branch 'origin/main' into problame/integrate-tokio-epoll-uring/benchmarking/2024-01-31-prs/async-walredo
Tricky to merge because I had split up walredo.rs in the meantime.
2024-03-13 14:01:57 +00:00
Christian Schwarz
96413743e4 move poison to utils and document 2024-01-31 16:24:19 +00:00
Christian Schwarz
b1b8ca32c8 working impl 2024-01-31 16:08:40 +00:00
Christian Schwarz
70b37cf88f WIP poison 2024-01-31 15:57:34 +00:00
Christian Schwarz
4160d407fb cfg(testing) still needs io::Write 2024-01-31 13:46:26 +00:00
Christian Schwarz
a29ac8b8a2 clippy (again?) 2024-01-31 13:14:36 +01:00
Christian Schwarz
639ed3cb3c clippy + compile errors 2024-01-31 12:26:27 +01:00
Christian Schwarz
2736f61604 error handling 2024-01-31 12:22:21 +01:00
Christian Schwarz
8012b80a45 some cleanup work 2024-01-31 12:17:05 +01:00
Christian Schwarz
a93be15360 remove wal_redo_timeout 2024-01-31 12:15:18 +01:00
Christian Schwarz
2c1652a02b WIP: async walredo 2024-01-31 12:09:24 +01:00
16 changed files with 1082 additions and 435 deletions

14
Cargo.lock generated
View File

@@ -6081,6 +6081,19 @@ dependencies = [
"xattr",
]
[[package]]
name = "tokio-test"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e89b3cbabd3ae862100094ae433e1def582cf86451b4e9bf83aa7ac1d8a7d719"
dependencies = [
"async-stream",
"bytes",
"futures-core",
"tokio",
"tokio-stream",
]
[[package]]
name = "tokio-tungstenite"
version = "0.20.0"
@@ -6572,6 +6585,7 @@ dependencies = [
"tokio",
"tokio-stream",
"tokio-tar",
"tokio-test",
"tokio-util",
"tracing",
"tracing-error",

View File

@@ -169,6 +169,7 @@ tokio-postgres-rustls = "0.11.0"
tokio-rustls = "0.25"
tokio-stream = "0.1"
tokio-tar = "0.3"
tokio-test = "0.4.3"
tokio-util = { version = "0.7.10", features = ["io", "rt"] }
toml = "0.7"
toml_edit = "0.19"

View File

@@ -70,6 +70,7 @@ criterion.workspace = true
hex-literal.workspace = true
camino-tempfile.workspace = true
serde_assert.workspace = true
tokio-test.workspace = true
[[bench]]
name = "benchmarks"

View File

@@ -89,6 +89,8 @@ pub mod yielding_loop;
pub mod zstd;
pub mod poison;
/// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages
///
/// we have several cases:

121
libs/utils/src/poison.rs Normal file
View File

@@ -0,0 +1,121 @@
//! Protect a piece of state from reuse after it is left in an inconsistent state.
//!
//! # Example
//!
//! ```
//! # tokio_test::block_on(async {
//! use utils::poison::Poison;
//! use std::time::Duration;
//!
//! struct State {
//! clean: bool,
//! }
//! let state = tokio::sync::Mutex::new(Poison::new("mystate", State { clean: true }));
//!
//! let mut mutex_guard = state.lock().await;
//! let mut poison_guard = mutex_guard.check_and_arm()?;
//! let state = poison_guard.data_mut();
//! state.clean = false;
//! // If we get cancelled at this await point, subsequent check_and_arm() calls will fail.
//! tokio::time::sleep(Duration::from_secs(10)).await;
//! state.clean = true;
//! poison_guard.disarm();
//! # Ok::<(), utils::poison::Error>(())
//! # });
//! ```
use tracing::warn;
pub struct Poison<T> {
what: &'static str,
state: State,
data: T,
}
#[derive(Clone, Copy)]
enum State {
Clean,
Armed,
Poisoned { at: chrono::DateTime<chrono::Utc> },
}
impl<T> Poison<T> {
/// We log `what` `warning!` level if the [`Guard`] gets dropped without being [`Guard::disarm`]ed.
pub fn new(what: &'static str, data: T) -> Self {
Self {
what,
state: State::Clean,
data,
}
}
/// Check for poisoning and return a [`Guard`] that provides access to the wrapped state.
pub fn check_and_arm(&mut self) -> Result<Guard<T>, Error> {
match self.state {
State::Clean => {
self.state = State::Armed;
Ok(Guard(self))
}
State::Armed => unreachable!("transient state"),
State::Poisoned { at } => Err(Error::Poisoned {
what: self.what,
at,
}),
}
}
}
/// Use [`Self::data`] and [`Self::data_mut`] to access the wrapped state.
/// Once modifications are done, use [`Self::disarm`].
/// If [`Guard`] gets dropped instead of calling [`Self::disarm`], the state is poisoned
/// and subsequent calls to [`Poison::check_and_arm`] will fail with an error.
pub struct Guard<'a, T>(&'a mut Poison<T>);
impl<'a, T> Guard<'a, T> {
pub fn data(&self) -> &T {
&self.0.data
}
pub fn data_mut(&mut self) -> &mut T {
&mut self.0.data
}
pub fn disarm(self) {
match self.0.state {
State::Clean => unreachable!("we set it to Armed in check_and_arm()"),
State::Armed => {
self.0.state = State::Clean;
}
State::Poisoned { at } => {
unreachable!("we fail check_and_arm() if it's in that state: {at}")
}
}
}
}
impl<'a, T> Drop for Guard<'a, T> {
fn drop(&mut self) {
match self.0.state {
State::Clean => {
// set by disarm()
}
State::Armed => {
// still armed => poison it
let at = chrono::Utc::now();
self.0.state = State::Poisoned { at };
warn!(at=?at, "poisoning {}", self.0.what);
}
State::Poisoned { at } => {
unreachable!("we fail check_and_arm() if it's in that state: {at}")
}
}
}
}
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("poisoned at {at}: {what}")]
Poisoned {
what: &'static str,
at: chrono::DateTime<chrono::Utc>,
},
}

View File

@@ -30,22 +30,22 @@
//! 2024-03-20 on i3en.3xlarge
//!
//! ```text
//! short/1 time: [26.483 µs 26.614 µs 26.767 µs]
//! short/2 time: [32.223 µs 32.465 µs 32.767 µs]
//! short/4 time: [47.203 µs 47.583 µs 47.984 µs]
//! short/8 time: [89.135 µs 89.612 µs 90.139 µs]
//! short/16 time: [190.12 µs 191.52 µs 192.88 µs]
//! short/32 time: [380.96 µs 382.63 µs 384.20 µs]
//! short/64 time: [736.86 µs 741.07 µs 745.03 µs]
//! short/128 time: [1.4106 ms 1.4206 ms 1.4294 ms]
//! medium/1 time: [111.81 µs 112.25 µs 112.79 µs]
//! medium/2 time: [158.26 µs 159.13 µs 160.21 µs]
//! medium/4 time: [334.65 µs 337.14 µs 340.07 µs]
//! medium/8 time: [675.32 µs 679.91 µs 685.25 µs]
//! medium/16 time: [1.2929 ms 1.2996 ms 1.3067 ms]
//! medium/32 time: [2.4295 ms 2.4461 ms 2.4623 ms]
//! medium/64 time: [4.3973 ms 4.4458 ms 4.4875 ms]
//! medium/128 time: [7.5955 ms 7.7847 ms 7.9481 ms]
//! short/1 time: [29.804 µs 29.959 µs 30.142 µs]
//! short/2 time: [33.847 µs 34.020 µs 34.223 µs]
//! short/4 time: [44.016 µs 44.259 µs 44.538 µs]
//! short/8 time: [77.512 µs 78.146 µs 78.796 µs]
//! short/16 time: [139.74 µs 141.37 µs 143.03 µs]
//! short/32 time: [269.00 µs 271.98 µs 275.04 µs]
//! short/64 time: [526.08 µs 528.47 µs 531.24 µs]
//! short/128 time: [1.0206 ms 1.0264 ms 1.0329 ms]
//! medium/1 time: [113.87 µs 114.41 µs 115.07 µs]
//! medium/2 time: [157.36 µs 158.05 µs 158.93 µs]
//! medium/4 time: [325.91 µs 328.01 µs 330.61 µs]
//! medium/8 time: [656.48 µs 663.55 µs 671.50 µs]
//! medium/16 time: [1.2956 ms 1.3069 ms 1.3203 ms]
//! medium/32 time: [2.5294 ms 2.5510 ms 2.5768 ms]
//! medium/64 time: [4.8728 ms 4.9262 ms 4.9862 ms]
//! medium/128 time: [8.9833 ms 9.1374 ms 9.2848 ms]
//! ```
use bytes::{Buf, Bytes};
@@ -126,7 +126,7 @@ fn bench_impl(redo_work: Arc<Request>, n_redos: u64, nclients: u64) -> Duration
}
rt.block_on(async move {
let mut total_wallclock_time = std::time::Duration::from_millis(0);
let mut total_wallclock_time = Duration::ZERO;
while let Some(res) = tasks.join_next().await {
total_wallclock_time += res.unwrap();
}

View File

@@ -58,7 +58,6 @@ pub mod defaults {
pub use storage_broker::DEFAULT_ENDPOINT as BROKER_DEFAULT_ENDPOINT;
pub const DEFAULT_WAIT_LSN_TIMEOUT: &str = "60 s";
pub const DEFAULT_WAL_REDO_TIMEOUT: &str = "60 s";
pub const DEFAULT_SUPERUSER: &str = "cloud_admin";
@@ -105,7 +104,6 @@ pub mod defaults {
#listen_http_addr = '{DEFAULT_HTTP_LISTEN_ADDR}'
#wait_lsn_timeout = '{DEFAULT_WAIT_LSN_TIMEOUT}'
#wal_redo_timeout = '{DEFAULT_WAL_REDO_TIMEOUT}'
#page_cache_size = {DEFAULT_PAGE_CACHE_SIZE}
#max_file_descriptors = {DEFAULT_MAX_FILE_DESCRIPTORS}
@@ -178,8 +176,6 @@ pub struct PageServerConf {
// Timeout when waiting for WAL receiver to catch up to an LSN given in a GetPage@LSN call.
pub wait_lsn_timeout: Duration,
// How long to wait for WAL redo to complete.
pub wal_redo_timeout: Duration,
pub superuser: String,
@@ -342,7 +338,6 @@ struct PageServerConfigBuilder {
availability_zone: BuilderValue<Option<String>>,
wait_lsn_timeout: BuilderValue<Duration>,
wal_redo_timeout: BuilderValue<Duration>,
superuser: BuilderValue<String>,
@@ -413,8 +408,6 @@ impl PageServerConfigBuilder {
availability_zone: Set(None),
wait_lsn_timeout: Set(humantime::parse_duration(DEFAULT_WAIT_LSN_TIMEOUT)
.expect("cannot parse default wait lsn timeout")),
wal_redo_timeout: Set(humantime::parse_duration(DEFAULT_WAL_REDO_TIMEOUT)
.expect("cannot parse default wal redo timeout")),
superuser: Set(DEFAULT_SUPERUSER.to_string()),
page_cache_size: Set(DEFAULT_PAGE_CACHE_SIZE),
max_file_descriptors: Set(DEFAULT_MAX_FILE_DESCRIPTORS),
@@ -507,10 +500,6 @@ impl PageServerConfigBuilder {
self.wait_lsn_timeout = BuilderValue::Set(wait_lsn_timeout)
}
pub fn wal_redo_timeout(&mut self, wal_redo_timeout: Duration) {
self.wal_redo_timeout = BuilderValue::Set(wal_redo_timeout)
}
pub fn superuser(&mut self, superuser: String) {
self.superuser = BuilderValue::Set(superuser)
}
@@ -688,7 +677,6 @@ impl PageServerConfigBuilder {
listen_http_addr,
availability_zone,
wait_lsn_timeout,
wal_redo_timeout,
superuser,
page_cache_size,
max_file_descriptors,
@@ -912,7 +900,6 @@ impl PageServerConf {
"listen_http_addr" => builder.listen_http_addr(parse_toml_string(key, item)?),
"availability_zone" => builder.availability_zone(Some(parse_toml_string(key, item)?)),
"wait_lsn_timeout" => builder.wait_lsn_timeout(parse_toml_duration(key, item)?),
"wal_redo_timeout" => builder.wal_redo_timeout(parse_toml_duration(key, item)?),
"initial_superuser_name" => builder.superuser(parse_toml_string(key, item)?),
"page_cache_size" => builder.page_cache_size(parse_toml_u64(key, item)? as usize),
"max_file_descriptors" => {
@@ -1045,7 +1032,6 @@ impl PageServerConf {
PageServerConf {
id: NodeId(0),
wait_lsn_timeout: Duration::from_secs(60),
wal_redo_timeout: Duration::from_secs(60),
page_cache_size: defaults::DEFAULT_PAGE_CACHE_SIZE,
max_file_descriptors: defaults::DEFAULT_MAX_FILE_DESCRIPTORS,
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
@@ -1235,7 +1221,6 @@ listen_pg_addr = '127.0.0.1:64000'
listen_http_addr = '127.0.0.1:9898'
wait_lsn_timeout = '111 s'
wal_redo_timeout = '111 s'
page_cache_size = 444
max_file_descriptors = 333
@@ -1276,7 +1261,6 @@ background_task_maximum_delay = '334 s'
listen_http_addr: defaults::DEFAULT_HTTP_LISTEN_ADDR.to_string(),
availability_zone: None,
wait_lsn_timeout: humantime::parse_duration(defaults::DEFAULT_WAIT_LSN_TIMEOUT)?,
wal_redo_timeout: humantime::parse_duration(defaults::DEFAULT_WAL_REDO_TIMEOUT)?,
superuser: defaults::DEFAULT_SUPERUSER.to_string(),
page_cache_size: defaults::DEFAULT_PAGE_CACHE_SIZE,
max_file_descriptors: defaults::DEFAULT_MAX_FILE_DESCRIPTORS,
@@ -1357,7 +1341,6 @@ background_task_maximum_delay = '334 s'
listen_http_addr: "127.0.0.1:9898".to_string(),
availability_zone: None,
wait_lsn_timeout: Duration::from_secs(111),
wal_redo_timeout: Duration::from_secs(111),
superuser: "zzzz".to_string(),
page_cache_size: 444,
max_file_descriptors: 333,

View File

@@ -2080,6 +2080,16 @@ async fn put_io_engine_handler(
json_response(StatusCode::OK, ())
}
async fn put_walredo_process_kind(
mut r: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
check_permission(&r, None)?;
let kind: crate::walredo::ProcessKind = json_request(&mut r).await?;
crate::walredo::set_process_kind(kind);
json_response(StatusCode::OK, ())
}
/// Polled by control plane.
///
/// See [`crate::utilization`].
@@ -2407,6 +2417,9 @@ pub fn make_router(
|r| api_handler(r, timeline_collect_keyspace),
)
.put("/v1/io_engine", |r| api_handler(r, put_io_engine_handler))
.put("/v1/walredo_process_kind", |r| {
api_handler(r, put_walredo_process_kind)
})
.get("/v1/utilization", |r| api_handler(r, get_utilization))
.any(handler_404))
}

View File

@@ -1813,6 +1813,20 @@ impl Default for WalRedoProcessCounters {
pub(crate) static WAL_REDO_PROCESS_COUNTERS: Lazy<WalRedoProcessCounters> =
Lazy::new(WalRedoProcessCounters::default);
#[cfg(not(test))]
pub(crate) mod wal_redo {
use super::*;
pub(crate) static PROCESS_KIND: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_wal_redo_process_kind",
"The configured process kind for walredo",
&["kind"],
)
.unwrap()
});
}
/// Similar to `prometheus::HistogramTimer` but does not record on drop.
pub(crate) struct StorageTimeMetricsTimer {
metrics: StorageTimeMetrics,

View File

@@ -2852,15 +2852,7 @@ impl Timeline {
lsn: Lsn,
ctx: &RequestContext,
) -> Option<(Lsn, Bytes)> {
let cache = page_cache::get();
// FIXME: It's pointless to check the cache for things that are not 8kB pages.
// We should look at the key to determine if it's a cacheable object
let (lsn, read_guard) = cache
.lookup_materialized_page(self.tenant_shard_id, self.timeline_id, key, lsn, ctx)
.await?;
let img = Bytes::from(read_guard.to_vec());
Some((lsn, img))
return None;
}
async fn get_ready_ancestor_timeline(

View File

@@ -20,6 +20,7 @@
/// Process lifecycle and abstracction for the IPC protocol.
mod process;
pub(crate) use process::{Kind as ProcessKind, set_kind as set_process_kind};
/// Code to apply [`NeonWalRecord`]s.
pub(crate) mod apply_neon;
@@ -53,7 +54,7 @@ pub struct PostgresRedoManager {
tenant_shard_id: TenantShardId,
conf: &'static PageServerConf,
last_redo_at: std::sync::Mutex<Option<Instant>>,
redo_process: RwLock<Option<Arc<process::WalRedoProcess>>>,
redo_process: RwLock<Option<Arc<process::Process>>>,
}
///
@@ -98,9 +99,9 @@ impl PostgresRedoManager {
img,
base_img_lsn,
&records[batch_start..i],
self.conf.wal_redo_timeout,
pg_version,
)
.await
};
img = Some(result?);
@@ -118,9 +119,9 @@ impl PostgresRedoManager {
img,
base_img_lsn,
&records[batch_start..],
self.conf.wal_redo_timeout,
pg_version,
)
.await
}
}
@@ -174,15 +175,17 @@ impl PostgresRedoManager {
///
/// Process one request for WAL redo using wal-redo postgres
///
/// # Cancel-Safety
///
/// Cancellation safe.
#[allow(clippy::too_many_arguments)]
fn apply_batch_postgres(
async fn apply_batch_postgres(
&self,
key: Key,
lsn: Lsn,
base_img: Option<Bytes>,
base_img_lsn: Lsn,
records: &[(Lsn, NeonWalRecord)],
wal_redo_timeout: Duration,
pg_version: u32,
) -> anyhow::Result<Bytes> {
*(self.last_redo_at.lock().unwrap()) = Some(Instant::now());
@@ -192,7 +195,7 @@ impl PostgresRedoManager {
let mut n_attempts = 0u32;
loop {
// launch the WAL redo process on first use
let proc: Arc<process::WalRedoProcess> = {
let proc: Arc<process::Process> = {
let proc_guard = self.redo_process.read().unwrap();
match &*proc_guard {
None => {
@@ -203,7 +206,7 @@ impl PostgresRedoManager {
None => {
let start = Instant::now();
let proc = Arc::new(
process::WalRedoProcess::launch(
process::Process::launch(
self.conf,
self.tenant_shard_id,
pg_version,
@@ -232,7 +235,8 @@ impl PostgresRedoManager {
// Relational WAL records are applied using wal-redo-postgres
let result = proc
.apply_wal_records(rel, blknum, &base_img, records, wal_redo_timeout)
.apply_wal_records(rel, blknum, &base_img, records)
.await
.context("apply_wal_records");
let duration = started_at.elapsed();

View File

@@ -1,408 +1,112 @@
use self::no_leak_child::NoLeakChild;
use crate::{
config::PageServerConf,
metrics::{WalRedoKillCause, WAL_REDO_PROCESS_COUNTERS, WAL_REDO_RECORD_COUNTER},
walrecord::NeonWalRecord,
};
use anyhow::Context;
use bytes::Bytes;
use nix::poll::{PollFd, PollFlags};
use pageserver_api::{reltag::RelTag, shard::TenantShardId};
use postgres_ffi::BLCKSZ;
use std::os::fd::AsRawFd;
#[cfg(feature = "testing")]
use std::sync::atomic::AtomicUsize;
use std::{
collections::VecDeque,
io::{Read, Write},
process::{ChildStdin, ChildStdout, Command, Stdio},
sync::{Mutex, MutexGuard},
sync::atomic::{AtomicU8, Ordering},
time::Duration,
};
use tracing::{debug, error, instrument, Instrument};
use utils::{lsn::Lsn, nonblock::set_nonblock};
use bytes::Bytes;
use pageserver_api::{reltag::RelTag, shard::TenantShardId};
use utils::lsn::Lsn;
use crate::{config::PageServerConf, walrecord::NeonWalRecord};
mod live_reconfig;
mod no_leak_child;
/// The IPC protocol that pageserver and walredo process speak over their shared pipe.
mod protocol;
pub struct WalRedoProcess {
#[allow(dead_code)]
conf: &'static PageServerConf,
tenant_shard_id: TenantShardId,
// Some() on construction, only becomes None on Drop.
child: Option<NoLeakChild>,
stdout: Mutex<ProcessOutput>,
stdin: Mutex<ProcessInput>,
/// Counter to separate same sized walredo inputs failing at the same millisecond.
#[cfg(feature = "testing")]
dump_sequence: AtomicUsize,
mod process_impl {
pub(super) mod process_async;
pub(super) mod process_std;
}
struct ProcessInput {
stdin: ChildStdin,
n_requests: usize,
#[derive(Clone, Copy, Debug, serde::Serialize, serde::Deserialize)]
#[repr(u8)]
pub(crate) enum Kind {
Sync,
Async,
}
struct ProcessOutput {
stdout: ChildStdout,
pending_responses: VecDeque<Option<Bytes>>,
n_processed_responses: usize,
impl TryFrom<u8> for Kind {
type Error = u8;
fn try_from(value: u8) -> Result<Self, Self::Error> {
Ok(match value {
v if v == (Kind::Sync as u8) => Kind::Sync,
v if v == (Kind::Async as u8) => Kind::Async,
x => return Err(x),
})
}
}
impl WalRedoProcess {
//
// Start postgres binary in special WAL redo mode.
//
#[instrument(skip_all,fields(pg_version=pg_version))]
pub(crate) fn launch(
static PROCESS_KIND: AtomicU8 = AtomicU8::new(Kind::Async as u8);
pub(crate) fn set_kind(kind: Kind) {
PROCESS_KIND.store(kind as u8, std::sync::atomic::Ordering::Relaxed);
#[cfg(not(test))]
{
let metric = &crate::metrics::wal_redo::PROCESS_KIND;
metric.reset();
metric.with_label_values(&[&format!("{kind:?}")]).set(1);
}
}
fn get() -> Kind {
Kind::try_from(PROCESS_KIND.load(Ordering::Relaxed)).unwrap()
}
pub(crate) enum Process {
Sync(process_impl::process_std::WalRedoProcess),
Async(process_impl::process_async::WalRedoProcess),
}
impl Process {
#[inline(always)]
pub fn launch(
conf: &'static PageServerConf,
tenant_shard_id: TenantShardId,
pg_version: u32,
) -> anyhow::Result<Self> {
crate::span::debug_assert_current_span_has_tenant_id();
let pg_bin_dir_path = conf.pg_bin_dir(pg_version).context("pg_bin_dir")?; // TODO these should be infallible.
let pg_lib_dir_path = conf.pg_lib_dir(pg_version).context("pg_lib_dir")?;
use no_leak_child::NoLeakChildCommandExt;
// Start postgres itself
let child = Command::new(pg_bin_dir_path.join("postgres"))
// the first arg must be --wal-redo so the child process enters into walredo mode
.arg("--wal-redo")
// the child doesn't process this arg, but, having it in the argv helps indentify the
// walredo process for a particular tenant when debugging a pagserver
.args(["--tenant-shard-id", &format!("{tenant_shard_id}")])
.stdin(Stdio::piped())
.stderr(Stdio::piped())
.stdout(Stdio::piped())
.env_clear()
.env("LD_LIBRARY_PATH", &pg_lib_dir_path)
.env("DYLD_LIBRARY_PATH", &pg_lib_dir_path)
// NB: The redo process is not trusted after we sent it the first
// walredo work. Before that, it is trusted. Specifically, we trust
// it to
// 1. close all file descriptors except stdin, stdout, stderr because
// pageserver might not be 100% diligent in setting FD_CLOEXEC on all
// the files it opens, and
// 2. to use seccomp to sandbox itself before processing the first
// walredo request.
.spawn_no_leak_child(tenant_shard_id)
.context("spawn process")?;
WAL_REDO_PROCESS_COUNTERS.started.inc();
let mut child = scopeguard::guard(child, |child| {
error!("killing wal-redo-postgres process due to a problem during launch");
child.kill_and_wait(WalRedoKillCause::Startup);
});
let stdin = child.stdin.take().unwrap();
let stdout = child.stdout.take().unwrap();
let stderr = child.stderr.take().unwrap();
let stderr = tokio::process::ChildStderr::from_std(stderr)
.context("convert to tokio::ChildStderr")?;
macro_rules! set_nonblock_or_log_err {
($file:ident) => {{
let res = set_nonblock($file.as_raw_fd());
if let Err(e) = &res {
error!(error = %e, file = stringify!($file), pid = child.id(), "set_nonblock failed");
}
res
}};
}
set_nonblock_or_log_err!(stdin)?;
set_nonblock_or_log_err!(stdout)?;
// all fallible operations post-spawn are complete, so get rid of the guard
let child = scopeguard::ScopeGuard::into_inner(child);
tokio::spawn(
async move {
scopeguard::defer! {
debug!("wal-redo-postgres stderr_logger_task finished");
crate::metrics::WAL_REDO_PROCESS_COUNTERS.active_stderr_logger_tasks_finished.inc();
}
debug!("wal-redo-postgres stderr_logger_task started");
crate::metrics::WAL_REDO_PROCESS_COUNTERS.active_stderr_logger_tasks_started.inc();
use tokio::io::AsyncBufReadExt;
let mut stderr_lines = tokio::io::BufReader::new(stderr);
let mut buf = Vec::new();
let res = loop {
buf.clear();
// TODO we don't trust the process to cap its stderr length.
// Currently it can do unbounded Vec allocation.
match stderr_lines.read_until(b'\n', &mut buf).await {
Ok(0) => break Ok(()), // eof
Ok(num_bytes) => {
let output = String::from_utf8_lossy(&buf[..num_bytes]);
error!(%output, "received output");
}
Err(e) => {
break Err(e);
}
}
};
match res {
Ok(()) => (),
Err(e) => {
error!(error=?e, "failed to read from walredo stderr");
}
}
}.instrument(tracing::info_span!(parent: None, "wal-redo-postgres-stderr", pid = child.id(), tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %pg_version))
);
Ok(Self {
conf,
tenant_shard_id,
child: Some(child),
stdin: Mutex::new(ProcessInput {
stdin,
n_requests: 0,
}),
stdout: Mutex::new(ProcessOutput {
stdout,
pending_responses: VecDeque::new(),
n_processed_responses: 0,
}),
#[cfg(feature = "testing")]
dump_sequence: AtomicUsize::default(),
Ok(match get() {
Kind::Sync => Self::Sync(process_impl::process_std::WalRedoProcess::launch(
conf,
tenant_shard_id,
pg_version,
)?),
Kind::Async => Self::Async(process_impl::process_async::WalRedoProcess::launch(
conf,
tenant_shard_id,
pg_version,
)?),
})
}
pub(crate) fn id(&self) -> u32 {
self.child
.as_ref()
.expect("must not call this during Drop")
.id()
}
// Apply given WAL records ('records') over an old page image. Returns
// new page image.
//
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), pid=%self.id()))]
pub(crate) fn apply_wal_records(
#[inline(always)]
pub(crate) async fn apply_wal_records(
&self,
rel: RelTag,
blknum: u32,
base_img: &Option<Bytes>,
records: &[(Lsn, NeonWalRecord)],
wal_redo_timeout: Duration,
) -> anyhow::Result<Bytes> {
let tag = protocol::BufferTag { rel, blknum };
let input = self.stdin.lock().unwrap();
// Serialize all the messages to send the WAL redo process first.
//
// This could be problematic if there are millions of records to replay,
// but in practice the number of records is usually so small that it doesn't
// matter, and it's better to keep this code simple.
//
// Most requests start with a before-image with BLCKSZ bytes, followed by
// by some other WAL records. Start with a buffer that can hold that
// comfortably.
let mut writebuf: Vec<u8> = Vec::with_capacity((BLCKSZ as usize) * 3);
protocol::build_begin_redo_for_block_msg(tag, &mut writebuf);
if let Some(img) = base_img {
protocol::build_push_page_msg(tag, img, &mut writebuf);
}
for (lsn, rec) in records.iter() {
if let NeonWalRecord::Postgres {
will_init: _,
rec: postgres_rec,
} = rec
{
protocol::build_apply_record_msg(*lsn, postgres_rec, &mut writebuf);
} else {
anyhow::bail!("tried to pass neon wal record to postgres WAL redo");
match self {
Process::Sync(p) => {
p.apply_wal_records(
rel,
blknum,
base_img,
records,
Duration::from_secs(100000), /* TODO remove */
)
.await
}
}
protocol::build_get_page_msg(tag, &mut writebuf);
WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);
let res = self.apply_wal_records0(&writebuf, input, wal_redo_timeout);
if res.is_err() {
// not all of these can be caused by this particular input, however these are so rare
// in tests so capture all.
self.record_and_log(&writebuf);
}
res
}
fn apply_wal_records0(
&self,
writebuf: &[u8],
input: MutexGuard<ProcessInput>,
wal_redo_timeout: Duration,
) -> anyhow::Result<Bytes> {
let mut proc = { input }; // TODO: remove this legacy rename, but this keep the patch small.
let mut nwrite = 0usize;
while nwrite < writebuf.len() {
let mut stdin_pollfds = [PollFd::new(&proc.stdin, PollFlags::POLLOUT)];
let n = loop {
match nix::poll::poll(&mut stdin_pollfds[..], wal_redo_timeout.as_millis() as i32) {
Err(nix::errno::Errno::EINTR) => continue,
res => break res,
}
}?;
if n == 0 {
anyhow::bail!("WAL redo timed out");
}
// If 'stdin' is writeable, do write.
let in_revents = stdin_pollfds[0].revents().unwrap();
if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() {
nwrite += proc.stdin.write(&writebuf[nwrite..])?;
}
if in_revents.contains(PollFlags::POLLHUP) {
// We still have more data to write, but the process closed the pipe.
anyhow::bail!("WAL redo process closed its stdin unexpectedly");
}
}
let request_no = proc.n_requests;
proc.n_requests += 1;
drop(proc);
// To improve walredo performance we separate sending requests and receiving
// responses. Them are protected by different mutexes (output and input).
// If thread T1, T2, T3 send requests D1, D2, D3 to walredo process
// then there is not warranty that T1 will first granted output mutex lock.
// To address this issue we maintain number of sent requests, number of processed
// responses and ring buffer with pending responses. After sending response
// (under input mutex), threads remembers request number. Then it releases
// input mutex, locks output mutex and fetch in ring buffer all responses until
// its stored request number. The it takes correspondent element from
// pending responses ring buffer and truncate all empty elements from the front,
// advancing processed responses number.
let mut output = self.stdout.lock().unwrap();
let n_processed_responses = output.n_processed_responses;
while n_processed_responses + output.pending_responses.len() <= request_no {
// We expect the WAL redo process to respond with an 8k page image. We read it
// into this buffer.
let mut resultbuf = vec![0; BLCKSZ.into()];
let mut nresult: usize = 0; // # of bytes read into 'resultbuf' so far
while nresult < BLCKSZ.into() {
let mut stdout_pollfds = [PollFd::new(&output.stdout, PollFlags::POLLIN)];
// We do two things simultaneously: reading response from stdout
// and forward any logging information that the child writes to its stderr to the page server's log.
let n = loop {
match nix::poll::poll(
&mut stdout_pollfds[..],
wal_redo_timeout.as_millis() as i32,
) {
Err(nix::errno::Errno::EINTR) => continue,
res => break res,
}
}?;
if n == 0 {
anyhow::bail!("WAL redo timed out");
}
// If we have some data in stdout, read it to the result buffer.
let out_revents = stdout_pollfds[0].revents().unwrap();
if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
nresult += output.stdout.read(&mut resultbuf[nresult..])?;
}
if out_revents.contains(PollFlags::POLLHUP) {
anyhow::bail!("WAL redo process closed its stdout unexpectedly");
}
}
output
.pending_responses
.push_back(Some(Bytes::from(resultbuf)));
}
// Replace our request's response with None in `pending_responses`.
// Then make space in the ring buffer by clearing out any seqence of contiguous
// `None`'s from the front of `pending_responses`.
// NB: We can't pop_front() because other requests' responses because another
// requester might have grabbed the output mutex before us:
// T1: grab input mutex
// T1: send request_no 23
// T1: release input mutex
// T2: grab input mutex
// T2: send request_no 24
// T2: release input mutex
// T2: grab output mutex
// T2: n_processed_responses + output.pending_responses.len() <= request_no
// 23 0 24
// T2: enters poll loop that reads stdout
// T2: put response for 23 into pending_responses
// T2: put response for 24 into pending_resposnes
// pending_responses now looks like this: Front Some(response_23) Some(response_24) Back
// T2: takes its response_24
// pending_responses now looks like this: Front Some(response_23) None Back
// T2: does the while loop below
// pending_responses now looks like this: Front Some(response_23) None Back
// T2: releases output mutex
// T1: grabs output mutex
// T1: n_processed_responses + output.pending_responses.len() > request_no
// 23 2 23
// T1: skips poll loop that reads stdout
// T1: takes its response_23
// pending_responses now looks like this: Front None None Back
// T2: does the while loop below
// pending_responses now looks like this: Front Back
// n_processed_responses now has value 25
let res = output.pending_responses[request_no - n_processed_responses]
.take()
.expect("we own this request_no, nobody else is supposed to take it");
while let Some(front) = output.pending_responses.front() {
if front.is_none() {
output.pending_responses.pop_front();
output.n_processed_responses += 1;
} else {
break;
}
}
Ok(res)
}
#[cfg(feature = "testing")]
fn record_and_log(&self, writebuf: &[u8]) {
use std::sync::atomic::Ordering;
let millis = std::time::SystemTime::now()
.duration_since(std::time::SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis();
let seq = self.dump_sequence.fetch_add(1, Ordering::Relaxed);
// these files will be collected to an allure report
let filename = format!("walredo-{millis}-{}-{seq}.walredo", writebuf.len());
let path = self.conf.tenant_path(&self.tenant_shard_id).join(&filename);
let res = std::fs::OpenOptions::new()
.write(true)
.create_new(true)
.read(true)
.open(path)
.and_then(|mut f| f.write_all(writebuf));
// trip up allowed_errors
if let Err(e) = res {
tracing::error!(target=%filename, length=writebuf.len(), "failed to write out the walredo errored input: {e}");
} else {
tracing::error!(filename, "erroring walredo input saved");
Process::Async(p) => p.apply_wal_records(rel, blknum, base_img, records).await,
}
}
#[cfg(not(feature = "testing"))]
fn record_and_log(&self, _: &[u8]) {}
}
impl Drop for WalRedoProcess {
fn drop(&mut self) {
self.child
.take()
.expect("we only do this once")
.kill_and_wait(WalRedoKillCause::WalRedoProcessDrop);
// no way to wait for stderr_logger_task from Drop because that is async only
pub(crate) fn id(&self) -> u32 {
match self {
Process::Sync(p) => p.id(),
Process::Async(p) => p.id(),
}
}
}

View File

@@ -0,0 +1,362 @@
use self::no_leak_child::NoLeakChild;
use crate::{
config::PageServerConf, metrics::{WalRedoKillCause, WAL_REDO_PROCESS_COUNTERS, WAL_REDO_RECORD_COUNTER}, walrecord::NeonWalRecord, walredo::process::{no_leak_child, protocol}
};
use anyhow::Context;
use bytes::Bytes;
use pageserver_api::{reltag::RelTag, shard::TenantShardId};
use postgres_ffi::BLCKSZ;
#[cfg(feature = "testing")]
use std::sync::atomic::AtomicUsize;
use std::{
collections::VecDeque,
process::{Command, Stdio},
};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tracing::{debug, error, instrument, Instrument};
use utils::{lsn::Lsn, poison::Poison};
pub struct WalRedoProcess {
#[allow(dead_code)]
conf: &'static PageServerConf,
tenant_shard_id: TenantShardId,
// Some() on construction, only becomes None on Drop.
child: Option<NoLeakChild>,
stdout: tokio::sync::Mutex<Poison<ProcessOutput>>,
stdin: tokio::sync::Mutex<Poison<ProcessInput>>,
/// Counter to separate same sized walredo inputs failing at the same millisecond.
#[cfg(feature = "testing")]
dump_sequence: AtomicUsize,
}
struct ProcessInput {
stdin: tokio::process::ChildStdin,
n_requests: usize,
}
struct ProcessOutput {
stdout: tokio::process::ChildStdout,
pending_responses: VecDeque<Option<Bytes>>,
n_processed_responses: usize,
}
impl WalRedoProcess {
//
// Start postgres binary in special WAL redo mode.
//
#[instrument(skip_all,fields(pg_version=pg_version))]
pub(crate) fn launch(
conf: &'static PageServerConf,
tenant_shard_id: TenantShardId,
pg_version: u32,
) -> anyhow::Result<Self> {
crate::span::debug_assert_current_span_has_tenant_id();
let pg_bin_dir_path = conf.pg_bin_dir(pg_version).context("pg_bin_dir")?; // TODO these should be infallible.
let pg_lib_dir_path = conf.pg_lib_dir(pg_version).context("pg_lib_dir")?;
use no_leak_child::NoLeakChildCommandExt;
// Start postgres itself
let child = Command::new(pg_bin_dir_path.join("postgres"))
// the first arg must be --wal-redo so the child process enters into walredo mode
.arg("--wal-redo")
// the child doesn't process this arg, but, having it in the argv helps indentify the
// walredo process for a particular tenant when debugging a pagserver
.args(["--tenant-shard-id", &format!("{tenant_shard_id}")])
.stdin(Stdio::piped())
.stderr(Stdio::piped())
.stdout(Stdio::piped())
.env_clear()
.env("LD_LIBRARY_PATH", &pg_lib_dir_path)
.env("DYLD_LIBRARY_PATH", &pg_lib_dir_path)
// NB: The redo process is not trusted after we sent it the first
// walredo work. Before that, it is trusted. Specifically, we trust
// it to
// 1. close all file descriptors except stdin, stdout, stderr because
// pageserver might not be 100% diligent in setting FD_CLOEXEC on all
// the files it opens, and
// 2. to use seccomp to sandbox itself before processing the first
// walredo request.
.spawn_no_leak_child(tenant_shard_id)
.context("spawn process")?;
WAL_REDO_PROCESS_COUNTERS.started.inc();
let mut child = scopeguard::guard(child, |child| {
error!("killing wal-redo-postgres process due to a problem during launch");
child.kill_and_wait(WalRedoKillCause::Startup);
});
let stdin = child.stdin.take().unwrap();
let stdout = child.stdout.take().unwrap();
let stderr = child.stderr.take().unwrap();
let stderr = tokio::process::ChildStderr::from_std(stderr)
.context("convert to tokio::ChildStderr")?;
let stdin =
tokio::process::ChildStdin::from_std(stdin).context("convert to tokio::ChildStdin")?;
let stdout = tokio::process::ChildStdout::from_std(stdout)
.context("convert to tokio::ChildStdout")?;
// all fallible operations post-spawn are complete, so get rid of the guard
let child = scopeguard::ScopeGuard::into_inner(child);
tokio::spawn(
async move {
scopeguard::defer! {
debug!("wal-redo-postgres stderr_logger_task finished");
crate::metrics::WAL_REDO_PROCESS_COUNTERS.active_stderr_logger_tasks_finished.inc();
}
debug!("wal-redo-postgres stderr_logger_task started");
crate::metrics::WAL_REDO_PROCESS_COUNTERS.active_stderr_logger_tasks_started.inc();
use tokio::io::AsyncBufReadExt;
let mut stderr_lines = tokio::io::BufReader::new(stderr);
let mut buf = Vec::new();
let res = loop {
buf.clear();
// TODO we don't trust the process to cap its stderr length.
// Currently it can do unbounded Vec allocation.
match stderr_lines.read_until(b'\n', &mut buf).await {
Ok(0) => break Ok(()), // eof
Ok(num_bytes) => {
let output = String::from_utf8_lossy(&buf[..num_bytes]);
error!(%output, "received output");
}
Err(e) => {
break Err(e);
}
}
};
match res {
Ok(()) => (),
Err(e) => {
error!(error=?e, "failed to read from walredo stderr");
}
}
}.instrument(tracing::info_span!(parent: None, "wal-redo-postgres-stderr", pid = child.id(), tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %pg_version))
);
Ok(Self {
conf,
tenant_shard_id,
child: Some(child),
stdin: tokio::sync::Mutex::new(Poison::new(
"stdin",
ProcessInput {
stdin,
n_requests: 0,
},
)),
stdout: tokio::sync::Mutex::new(Poison::new(
"stdout",
ProcessOutput {
stdout,
pending_responses: VecDeque::new(),
n_processed_responses: 0,
},
)),
#[cfg(feature = "testing")]
dump_sequence: AtomicUsize::default(),
})
}
pub(crate) fn id(&self) -> u32 {
self.child
.as_ref()
.expect("must not call this during Drop")
.id()
}
/// Apply given WAL records ('records') over an old page image. Returns
/// new page image.
///
/// # Cancel-Safety
///
/// Cancellation safe.
#[instrument(skip_all, level = tracing::Level::DEBUG, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), pid=%self.id()))]
pub(crate) async fn apply_wal_records(
&self,
rel: RelTag,
blknum: u32,
base_img: &Option<Bytes>,
records: &[(Lsn, NeonWalRecord)],
) -> anyhow::Result<Bytes> {
let tag = protocol::BufferTag { rel, blknum };
// Serialize all the messages to send the WAL redo process first.
//
// This could be problematic if there are millions of records to replay,
// but in practice the number of records is usually so small that it doesn't
// matter, and it's better to keep this code simple.
//
// Most requests start with a before-image with BLCKSZ bytes, followed by
// by some other WAL records. Start with a buffer that can hold that
// comfortably.
let mut writebuf: Vec<u8> = Vec::with_capacity((BLCKSZ as usize) * 3);
protocol::build_begin_redo_for_block_msg(tag, &mut writebuf);
if let Some(img) = base_img {
protocol::build_push_page_msg(tag, img, &mut writebuf);
}
for (lsn, rec) in records.iter() {
if let NeonWalRecord::Postgres {
will_init: _,
rec: postgres_rec,
} = rec
{
protocol::build_apply_record_msg(*lsn, postgres_rec, &mut writebuf);
} else {
anyhow::bail!("tried to pass neon wal record to postgres WAL redo");
}
}
protocol::build_get_page_msg(tag, &mut writebuf);
WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);
let res = self.apply_wal_records0(&writebuf).await;
if res.is_err() {
// not all of these can be caused by this particular input, however these are so rare
// in tests so capture all.
self.record_and_log(&writebuf);
}
res
}
/// # Cancel-Safety
///
/// Cancellation safe (enforced through the use of [`utils::poison::Poison`]).
async fn apply_wal_records0(&self, writebuf: &[u8]) -> anyhow::Result<Bytes> {
let request_no = {
let mut lock_guard = self.stdin.lock().await;
let mut poison_guard = lock_guard.check_and_arm()?;
let input = poison_guard.data_mut();
input
.stdin
.write_all(writebuf)
.await
.context("write to walredo stdin")?;
let request_no = input.n_requests;
input.n_requests += 1;
poison_guard.disarm();
request_no
};
// To improve walredo performance we separate sending requests and receiving
// responses. Them are protected by different mutexes (output and input).
// If thread T1, T2, T3 send requests D1, D2, D3 to walredo process
// then there is not warranty that T1 will first granted output mutex lock.
// To address this issue we maintain number of sent requests, number of processed
// responses and ring buffer with pending responses. After sending response
// (under input mutex), threads remembers request number. Then it releases
// input mutex, locks output mutex and fetch in ring buffer all responses until
// its stored request number. The it takes correspondent element from
// pending responses ring buffer and truncate all empty elements from the front,
// advancing processed responses number.
let mut lock_guard = self.stdout.lock().await;
let mut poison_guard = lock_guard.check_and_arm()?;
let output = poison_guard.data_mut();
let n_processed_responses = output.n_processed_responses;
while n_processed_responses + output.pending_responses.len() <= request_no {
// We expect the WAL redo process to respond with an 8k page image. We read it
// into this buffer.
let mut resultbuf = vec![0; BLCKSZ.into()];
output
.stdout
.read_exact(&mut resultbuf)
.await
.context("read walredo stdout")?;
output
.pending_responses
.push_back(Some(Bytes::from(resultbuf)));
}
// Replace our request's response with None in `pending_responses`.
// Then make space in the ring buffer by clearing out any seqence of contiguous
// `None`'s from the front of `pending_responses`.
// NB: We can't pop_front() because other requests' responses because another
// requester might have grabbed the output mutex before us:
// T1: grab input mutex
// T1: send request_no 23
// T1: release input mutex
// T2: grab input mutex
// T2: send request_no 24
// T2: release input mutex
// T2: grab output mutex
// T2: n_processed_responses + output.pending_responses.len() <= request_no
// 23 0 24
// T2: enters poll loop that reads stdout
// T2: put response for 23 into pending_responses
// T2: put response for 24 into pending_resposnes
// pending_responses now looks like this: Front Some(response_23) Some(response_24) Back
// T2: takes its response_24
// pending_responses now looks like this: Front Some(response_23) None Back
// T2: does the while loop below
// pending_responses now looks like this: Front Some(response_23) None Back
// T2: releases output mutex
// T1: grabs output mutex
// T1: n_processed_responses + output.pending_responses.len() > request_no
// 23 2 23
// T1: skips poll loop that reads stdout
// T1: takes its response_23
// pending_responses now looks like this: Front None None Back
// T2: does the while loop below
// pending_responses now looks like this: Front Back
// n_processed_responses now has value 25
let res = output.pending_responses[request_no - n_processed_responses]
.take()
.expect("we own this request_no, nobody else is supposed to take it");
while let Some(front) = output.pending_responses.front() {
if front.is_none() {
output.pending_responses.pop_front();
output.n_processed_responses += 1;
} else {
break;
}
}
poison_guard.disarm();
Ok(res)
}
#[cfg(feature = "testing")]
fn record_and_log(&self, writebuf: &[u8]) {
use std::sync::atomic::Ordering;
let millis = std::time::SystemTime::now()
.duration_since(std::time::SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis();
let seq = self.dump_sequence.fetch_add(1, Ordering::Relaxed);
// these files will be collected to an allure report
let filename = format!("walredo-{millis}-{}-{seq}.walredo", writebuf.len());
let path = self.conf.tenant_path(&self.tenant_shard_id).join(&filename);
use std::io::Write;
let res = std::fs::OpenOptions::new()
.write(true)
.create_new(true)
.read(true)
.open(path)
.and_then(|mut f| f.write_all(writebuf));
// trip up allowed_errors
if let Err(e) = res {
tracing::error!(target=%filename, length=writebuf.len(), "failed to write out the walredo errored input: {e}");
} else {
tracing::error!(filename, "erroring walredo input saved");
}
}
#[cfg(not(feature = "testing"))]
fn record_and_log(&self, _: &[u8]) {}
}
impl Drop for WalRedoProcess {
fn drop(&mut self) {
self.child
.take()
.expect("we only do this once")
.kill_and_wait(WalRedoKillCause::WalRedoProcessDrop);
// no way to wait for stderr_logger_task from Drop because that is async only
}
}

View File

@@ -0,0 +1,402 @@
use self::no_leak_child::NoLeakChild;
use crate::{
config::PageServerConf, metrics::{WalRedoKillCause, WAL_REDO_PROCESS_COUNTERS, WAL_REDO_RECORD_COUNTER}, walrecord::NeonWalRecord, walredo::process::{no_leak_child, protocol}
};
use anyhow::Context;
use bytes::Bytes;
use nix::poll::{PollFd, PollFlags};
use pageserver_api::{reltag::RelTag, shard::TenantShardId};
use postgres_ffi::BLCKSZ;
use std::os::fd::AsRawFd;
#[cfg(feature = "testing")]
use std::sync::atomic::AtomicUsize;
use std::{
collections::VecDeque,
io::{Read, Write},
process::{ChildStdin, ChildStdout, Command, Stdio},
sync::{Mutex, MutexGuard},
time::Duration,
};
use tracing::{debug, error, instrument, Instrument};
use utils::{lsn::Lsn, nonblock::set_nonblock};
pub struct WalRedoProcess {
#[allow(dead_code)]
conf: &'static PageServerConf,
tenant_shard_id: TenantShardId,
// Some() on construction, only becomes None on Drop.
child: Option<NoLeakChild>,
stdout: Mutex<ProcessOutput>,
stdin: Mutex<ProcessInput>,
/// Counter to separate same sized walredo inputs failing at the same millisecond.
#[cfg(feature = "testing")]
dump_sequence: AtomicUsize,
}
struct ProcessInput {
stdin: ChildStdin,
n_requests: usize,
}
struct ProcessOutput {
stdout: ChildStdout,
pending_responses: VecDeque<Option<Bytes>>,
n_processed_responses: usize,
}
impl WalRedoProcess {
//
// Start postgres binary in special WAL redo mode.
//
#[instrument(skip_all,fields(pg_version=pg_version))]
pub(crate) fn launch(
conf: &'static PageServerConf,
tenant_shard_id: TenantShardId,
pg_version: u32,
) -> anyhow::Result<Self> {
crate::span::debug_assert_current_span_has_tenant_id();
let pg_bin_dir_path = conf.pg_bin_dir(pg_version).context("pg_bin_dir")?; // TODO these should be infallible.
let pg_lib_dir_path = conf.pg_lib_dir(pg_version).context("pg_lib_dir")?;
use no_leak_child::NoLeakChildCommandExt;
// Start postgres itself
let child = Command::new(pg_bin_dir_path.join("postgres"))
// the first arg must be --wal-redo so the child process enters into walredo mode
.arg("--wal-redo")
// the child doesn't process this arg, but, having it in the argv helps indentify the
// walredo process for a particular tenant when debugging a pagserver
.args(["--tenant-shard-id", &format!("{tenant_shard_id}")])
.stdin(Stdio::piped())
.stderr(Stdio::piped())
.stdout(Stdio::piped())
.env_clear()
.env("LD_LIBRARY_PATH", &pg_lib_dir_path)
.env("DYLD_LIBRARY_PATH", &pg_lib_dir_path)
// NB: The redo process is not trusted after we sent it the first
// walredo work. Before that, it is trusted. Specifically, we trust
// it to
// 1. close all file descriptors except stdin, stdout, stderr because
// pageserver might not be 100% diligent in setting FD_CLOEXEC on all
// the files it opens, and
// 2. to use seccomp to sandbox itself before processing the first
// walredo request.
.spawn_no_leak_child(tenant_shard_id)
.context("spawn process")?;
WAL_REDO_PROCESS_COUNTERS.started.inc();
let mut child = scopeguard::guard(child, |child| {
error!("killing wal-redo-postgres process due to a problem during launch");
child.kill_and_wait(WalRedoKillCause::Startup);
});
let stdin = child.stdin.take().unwrap();
let stdout = child.stdout.take().unwrap();
let stderr = child.stderr.take().unwrap();
let stderr = tokio::process::ChildStderr::from_std(stderr)
.context("convert to tokio::ChildStderr")?;
macro_rules! set_nonblock_or_log_err {
($file:ident) => {{
let res = set_nonblock($file.as_raw_fd());
if let Err(e) = &res {
error!(error = %e, file = stringify!($file), pid = child.id(), "set_nonblock failed");
}
res
}};
}
set_nonblock_or_log_err!(stdin)?;
set_nonblock_or_log_err!(stdout)?;
// all fallible operations post-spawn are complete, so get rid of the guard
let child = scopeguard::ScopeGuard::into_inner(child);
tokio::spawn(
async move {
scopeguard::defer! {
debug!("wal-redo-postgres stderr_logger_task finished");
crate::metrics::WAL_REDO_PROCESS_COUNTERS.active_stderr_logger_tasks_finished.inc();
}
debug!("wal-redo-postgres stderr_logger_task started");
crate::metrics::WAL_REDO_PROCESS_COUNTERS.active_stderr_logger_tasks_started.inc();
use tokio::io::AsyncBufReadExt;
let mut stderr_lines = tokio::io::BufReader::new(stderr);
let mut buf = Vec::new();
let res = loop {
buf.clear();
// TODO we don't trust the process to cap its stderr length.
// Currently it can do unbounded Vec allocation.
match stderr_lines.read_until(b'\n', &mut buf).await {
Ok(0) => break Ok(()), // eof
Ok(num_bytes) => {
let output = String::from_utf8_lossy(&buf[..num_bytes]);
error!(%output, "received output");
}
Err(e) => {
break Err(e);
}
}
};
match res {
Ok(()) => (),
Err(e) => {
error!(error=?e, "failed to read from walredo stderr");
}
}
}.instrument(tracing::info_span!(parent: None, "wal-redo-postgres-stderr", pid = child.id(), tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %pg_version))
);
Ok(Self {
conf,
tenant_shard_id,
child: Some(child),
stdin: Mutex::new(ProcessInput {
stdin,
n_requests: 0,
}),
stdout: Mutex::new(ProcessOutput {
stdout,
pending_responses: VecDeque::new(),
n_processed_responses: 0,
}),
#[cfg(feature = "testing")]
dump_sequence: AtomicUsize::default(),
})
}
pub(crate) fn id(&self) -> u32 {
self.child
.as_ref()
.expect("must not call this during Drop")
.id()
}
// Apply given WAL records ('records') over an old page image. Returns
// new page image.
//
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), pid=%self.id()))]
pub(crate) async fn apply_wal_records(
&self,
rel: RelTag,
blknum: u32,
base_img: &Option<Bytes>,
records: &[(Lsn, NeonWalRecord)],
wal_redo_timeout: Duration,
) -> anyhow::Result<Bytes> {
let tag = protocol::BufferTag { rel, blknum };
let input = self.stdin.lock().unwrap();
// Serialize all the messages to send the WAL redo process first.
//
// This could be problematic if there are millions of records to replay,
// but in practice the number of records is usually so small that it doesn't
// matter, and it's better to keep this code simple.
//
// Most requests start with a before-image with BLCKSZ bytes, followed by
// by some other WAL records. Start with a buffer that can hold that
// comfortably.
let mut writebuf: Vec<u8> = Vec::with_capacity((BLCKSZ as usize) * 3);
protocol::build_begin_redo_for_block_msg(tag, &mut writebuf);
if let Some(img) = base_img {
protocol::build_push_page_msg(tag, img, &mut writebuf);
}
for (lsn, rec) in records.iter() {
if let NeonWalRecord::Postgres {
will_init: _,
rec: postgres_rec,
} = rec
{
protocol::build_apply_record_msg(*lsn, postgres_rec, &mut writebuf);
} else {
anyhow::bail!("tried to pass neon wal record to postgres WAL redo");
}
}
protocol::build_get_page_msg(tag, &mut writebuf);
WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);
let res = self.apply_wal_records0(&writebuf, input, wal_redo_timeout);
if res.is_err() {
// not all of these can be caused by this particular input, however these are so rare
// in tests so capture all.
self.record_and_log(&writebuf);
}
res
}
fn apply_wal_records0(
&self,
writebuf: &[u8],
input: MutexGuard<ProcessInput>,
wal_redo_timeout: Duration,
) -> anyhow::Result<Bytes> {
let mut proc = { input }; // TODO: remove this legacy rename, but this keep the patch small.
let mut nwrite = 0usize;
while nwrite < writebuf.len() {
let mut stdin_pollfds = [PollFd::new(&proc.stdin, PollFlags::POLLOUT)];
let n = loop {
match nix::poll::poll(&mut stdin_pollfds[..], wal_redo_timeout.as_millis() as i32) {
Err(nix::errno::Errno::EINTR) => continue,
res => break res,
}
}?;
if n == 0 {
anyhow::bail!("WAL redo timed out");
}
// If 'stdin' is writeable, do write.
let in_revents = stdin_pollfds[0].revents().unwrap();
if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() {
nwrite += proc.stdin.write(&writebuf[nwrite..])?;
}
if in_revents.contains(PollFlags::POLLHUP) {
// We still have more data to write, but the process closed the pipe.
anyhow::bail!("WAL redo process closed its stdin unexpectedly");
}
}
let request_no = proc.n_requests;
proc.n_requests += 1;
drop(proc);
// To improve walredo performance we separate sending requests and receiving
// responses. Them are protected by different mutexes (output and input).
// If thread T1, T2, T3 send requests D1, D2, D3 to walredo process
// then there is not warranty that T1 will first granted output mutex lock.
// To address this issue we maintain number of sent requests, number of processed
// responses and ring buffer with pending responses. After sending response
// (under input mutex), threads remembers request number. Then it releases
// input mutex, locks output mutex and fetch in ring buffer all responses until
// its stored request number. The it takes correspondent element from
// pending responses ring buffer and truncate all empty elements from the front,
// advancing processed responses number.
let mut output = self.stdout.lock().unwrap();
let n_processed_responses = output.n_processed_responses;
while n_processed_responses + output.pending_responses.len() <= request_no {
// We expect the WAL redo process to respond with an 8k page image. We read it
// into this buffer.
let mut resultbuf = vec![0; BLCKSZ.into()];
let mut nresult: usize = 0; // # of bytes read into 'resultbuf' so far
while nresult < BLCKSZ.into() {
let mut stdout_pollfds = [PollFd::new(&output.stdout, PollFlags::POLLIN)];
// We do two things simultaneously: reading response from stdout
// and forward any logging information that the child writes to its stderr to the page server's log.
let n = loop {
match nix::poll::poll(
&mut stdout_pollfds[..],
wal_redo_timeout.as_millis() as i32,
) {
Err(nix::errno::Errno::EINTR) => continue,
res => break res,
}
}?;
if n == 0 {
anyhow::bail!("WAL redo timed out");
}
// If we have some data in stdout, read it to the result buffer.
let out_revents = stdout_pollfds[0].revents().unwrap();
if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
nresult += output.stdout.read(&mut resultbuf[nresult..])?;
}
if out_revents.contains(PollFlags::POLLHUP) {
anyhow::bail!("WAL redo process closed its stdout unexpectedly");
}
}
output
.pending_responses
.push_back(Some(Bytes::from(resultbuf)));
}
// Replace our request's response with None in `pending_responses`.
// Then make space in the ring buffer by clearing out any seqence of contiguous
// `None`'s from the front of `pending_responses`.
// NB: We can't pop_front() because other requests' responses because another
// requester might have grabbed the output mutex before us:
// T1: grab input mutex
// T1: send request_no 23
// T1: release input mutex
// T2: grab input mutex
// T2: send request_no 24
// T2: release input mutex
// T2: grab output mutex
// T2: n_processed_responses + output.pending_responses.len() <= request_no
// 23 0 24
// T2: enters poll loop that reads stdout
// T2: put response for 23 into pending_responses
// T2: put response for 24 into pending_resposnes
// pending_responses now looks like this: Front Some(response_23) Some(response_24) Back
// T2: takes its response_24
// pending_responses now looks like this: Front Some(response_23) None Back
// T2: does the while loop below
// pending_responses now looks like this: Front Some(response_23) None Back
// T2: releases output mutex
// T1: grabs output mutex
// T1: n_processed_responses + output.pending_responses.len() > request_no
// 23 2 23
// T1: skips poll loop that reads stdout
// T1: takes its response_23
// pending_responses now looks like this: Front None None Back
// T2: does the while loop below
// pending_responses now looks like this: Front Back
// n_processed_responses now has value 25
let res = output.pending_responses[request_no - n_processed_responses]
.take()
.expect("we own this request_no, nobody else is supposed to take it");
while let Some(front) = output.pending_responses.front() {
if front.is_none() {
output.pending_responses.pop_front();
output.n_processed_responses += 1;
} else {
break;
}
}
Ok(res)
}
#[cfg(feature = "testing")]
fn record_and_log(&self, writebuf: &[u8]) {
use std::sync::atomic::Ordering;
let millis = std::time::SystemTime::now()
.duration_since(std::time::SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis();
let seq = self.dump_sequence.fetch_add(1, Ordering::Relaxed);
// these files will be collected to an allure report
let filename = format!("walredo-{millis}-{}-{seq}.walredo", writebuf.len());
let path = self.conf.tenant_path(&self.tenant_shard_id).join(&filename);
let res = std::fs::OpenOptions::new()
.write(true)
.create_new(true)
.read(true)
.open(path)
.and_then(|mut f| f.write_all(writebuf));
// trip up allowed_errors
if let Err(e) = res {
tracing::error!(target=%filename, length=writebuf.len(), "failed to write out the walredo errored input: {e}");
} else {
tracing::error!(filename, "erroring walredo input saved");
}
}
#[cfg(not(feature = "testing"))]
fn record_and_log(&self, _: &[u8]) {}
}
impl Drop for WalRedoProcess {
fn drop(&mut self) {
self.child
.take()
.expect("we only do this once")
.kill_and_wait(WalRedoKillCause::WalRedoProcessDrop);
// no way to wait for stderr_logger_task from Drop because that is async only
}
}

View File

@@ -8,6 +8,7 @@ from fixtures.neon_fixtures import (
NeonEnvBuilder,
PgBin,
last_flush_lsn_upload,
wait_for_last_flush_lsn,
)
from performance.pageserver.util import ensure_pageserver_ready_for_benchmarking
@@ -27,10 +28,16 @@ def test_many_small_tenants(
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
):
_env = setup_env(neon_env_builder, 2) # vary this to the desired number of tenants
_pg_bin = pg_bin
# drop into pdb so that we can debug pageserver interactively, use pdb here
page_cache_size = int(70 * (1024**3) / 8192)
max_file_descriptors = 500000
neon_env_builder.pageserver_config_override = (
f"page_cache_size={page_cache_size}; max_file_descriptors={max_file_descriptors}"
)
_env = setup_env(neon_env_builder, 3000, pg_bin) # vary this to the desired number of tenants
# drop into pdb so that we can debug pageserver interactively, use pdb here
# For example, to interactively examine pageserver startup behavior, call
# _env.pageserver.stop(immediate=True)
# _env.pageserver.start()
@@ -41,6 +48,7 @@ def test_many_small_tenants(
def setup_env(
neon_env_builder: NeonEnvBuilder,
n_tenants: int,
pg_bin: PgBin,
) -> NeonEnv:
def setup_template(env: NeonEnv):
# create our template tenant
@@ -60,12 +68,38 @@ def setup_env(
".*Dropped remote consistent LSN updates.*",
)
env.pageserver.tenant_attach(template_tenant, config)
ep = env.endpoints.create_start("main", tenant_id=template_tenant)
ep.safe_psql("create table foo(b text)")
for _ in range(0, 8):
ep.safe_psql("insert into foo(b) values ('some text')")
last_flush_lsn_upload(env, ep, template_tenant, template_timeline)
ep.stop_and_destroy()
ps_http = env.pageserver.http_client()
scale = 10
with env.endpoints.create_start("main", tenant_id=template_tenant) as ep:
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", "-I", "dtGvp", ep.connstr()])
wait_for_last_flush_lsn(env, ep, template_tenant, template_timeline)
ps_http.timeline_checkpoint(template_tenant, template_timeline)
ps_http.timeline_compact(template_tenant, template_timeline)
for _ in range(
0, 17
): # some prime number to avoid potential resonances with the "_threshold" variables from the config
# the L0s produced by this appear to have size ~5MiB
num_txns = 10_000
pg_bin.run_capture(
["pgbench", "-N", "-c1", "--transactions", f"{num_txns}", ep.connstr()]
)
wait_for_last_flush_lsn(env, ep, template_tenant, template_timeline)
ps_http.timeline_checkpoint(template_tenant, template_timeline)
ps_http.timeline_compact(template_tenant, template_timeline)
# for reference, the output at scale=6 looked like so (306M total)
# ls -sh test_output/shared-snapshots/max_throughput_latest_lsn-2-6/snapshot/pageserver_1/tenants/35c30b88ea16a7a09f82d9c6a115551b/timelines/da902b378eebe83dc8a4e81cd3dc1c59
# total 306M
# 188M 000000000000000000000000000000000000-030000000000000000000000000000000003__000000000149F060-0000000009E75829
# 4.5M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000009E75829-000000000A21E919
# 33M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000A21E919-000000000C20CB71
# 36M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000C20CB71-000000000E470791
# 16M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000E470791-000000000F34AEF1
# 8.2M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000F34AEF1-000000000FABA8A9
# 6.0M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000FABA8A9-000000000FFE0639
# 6.1M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000FFE0639-000000001051D799
# 4.7M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000001051D799-0000000010908F19
# 4.6M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000010908F19-0000000010CD3021
ep.stop_and_destroy()
return (template_tenant, template_timeline, config)
def doit(neon_env_builder: NeonEnvBuilder) -> NeonEnv: