mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-15 20:20:38 +00:00
Compare commits
68 Commits
parameteri
...
problame/a
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bec00d17c3 | ||
|
|
6c5c2c80d2 | ||
|
|
c9e8ae40dd | ||
|
|
0047481cc8 | ||
|
|
feaca7bc1e | ||
|
|
5d12475664 | ||
|
|
6e0a02de27 | ||
|
|
afd2c1369e | ||
|
|
b44bbd276e | ||
|
|
aad5a672f0 | ||
|
|
5565087dba | ||
|
|
825c0e30e8 | ||
|
|
f489a10509 | ||
|
|
bea2e121dd | ||
|
|
82e7e4d84a | ||
|
|
4ef2fb29fa | ||
|
|
ffef90f3db | ||
|
|
d8a926618e | ||
|
|
c38b3e6ad6 | ||
|
|
b72891d28c | ||
|
|
5efaddea02 | ||
|
|
aa5439cb6e | ||
|
|
dc8e318a42 | ||
|
|
871a3caca9 | ||
|
|
edd7f69c2d | ||
|
|
70fb7e3580 | ||
|
|
6b820bb423 | ||
|
|
740efb0ab5 | ||
|
|
5cf45df692 | ||
|
|
3779854f12 | ||
|
|
dc03f7a44f | ||
|
|
43cf9d10d2 | ||
|
|
31d4d1e233 | ||
|
|
c77ce7cdc3 | ||
|
|
67a7abc7cf | ||
|
|
cca66e5e82 | ||
|
|
655d3b6468 | ||
|
|
3dfc7de99b | ||
|
|
3a5994bb43 | ||
|
|
86b0df973a | ||
|
|
e669b6d852 | ||
|
|
b2f5b84c2f | ||
|
|
a21409b509 | ||
|
|
c6a74bd17f | ||
|
|
d6c45625e6 | ||
|
|
db3333eecb | ||
|
|
15cfa7b6e9 | ||
|
|
8677136bc8 | ||
|
|
a37d713f2b | ||
|
|
081af38076 | ||
|
|
929423cf68 | ||
|
|
48b22bd057 | ||
|
|
80de8567dc | ||
|
|
f038304274 | ||
|
|
c853c61c11 | ||
|
|
f31f2e9e24 | ||
|
|
cd6d9abafb | ||
|
|
0cf5619e4a | ||
|
|
96413743e4 | ||
|
|
b1b8ca32c8 | ||
|
|
70b37cf88f | ||
|
|
4160d407fb | ||
|
|
a29ac8b8a2 | ||
|
|
639ed3cb3c | ||
|
|
2736f61604 | ||
|
|
8012b80a45 | ||
|
|
a93be15360 | ||
|
|
2c1652a02b |
@@ -91,6 +91,8 @@ pub mod zstd;
|
||||
|
||||
pub mod env;
|
||||
|
||||
pub mod poison;
|
||||
|
||||
/// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages
|
||||
///
|
||||
/// we have several cases:
|
||||
|
||||
121
libs/utils/src/poison.rs
Normal file
121
libs/utils/src/poison.rs
Normal file
@@ -0,0 +1,121 @@
|
||||
//! Protect a piece of state from reuse after it is left in an inconsistent state.
|
||||
//!
|
||||
//! # Example
|
||||
//!
|
||||
//! ```
|
||||
//! # tokio_test::block_on(async {
|
||||
//! use utils::poison::Poison;
|
||||
//! use std::time::Duration;
|
||||
//!
|
||||
//! struct State {
|
||||
//! clean: bool,
|
||||
//! }
|
||||
//! let state = tokio::sync::Mutex::new(Poison::new("mystate", State { clean: true }));
|
||||
//!
|
||||
//! let mut mutex_guard = state.lock().await;
|
||||
//! let mut poison_guard = mutex_guard.check_and_arm()?;
|
||||
//! let state = poison_guard.data_mut();
|
||||
//! state.clean = false;
|
||||
//! // If we get cancelled at this await point, subsequent check_and_arm() calls will fail.
|
||||
//! tokio::time::sleep(Duration::from_secs(10)).await;
|
||||
//! state.clean = true;
|
||||
//! poison_guard.disarm();
|
||||
//! # Ok::<(), utils::poison::Error>(())
|
||||
//! # });
|
||||
//! ```
|
||||
|
||||
use tracing::warn;
|
||||
|
||||
pub struct Poison<T> {
|
||||
what: &'static str,
|
||||
state: State,
|
||||
data: T,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
enum State {
|
||||
Clean,
|
||||
Armed,
|
||||
Poisoned { at: chrono::DateTime<chrono::Utc> },
|
||||
}
|
||||
|
||||
impl<T> Poison<T> {
|
||||
/// We log `what` `warning!` level if the [`Guard`] gets dropped without being [`Guard::disarm`]ed.
|
||||
pub fn new(what: &'static str, data: T) -> Self {
|
||||
Self {
|
||||
what,
|
||||
state: State::Clean,
|
||||
data,
|
||||
}
|
||||
}
|
||||
|
||||
/// Check for poisoning and return a [`Guard`] that provides access to the wrapped state.
|
||||
pub fn check_and_arm(&mut self) -> Result<Guard<T>, Error> {
|
||||
match self.state {
|
||||
State::Clean => {
|
||||
self.state = State::Armed;
|
||||
Ok(Guard(self))
|
||||
}
|
||||
State::Armed => unreachable!("transient state"),
|
||||
State::Poisoned { at } => Err(Error::Poisoned {
|
||||
what: self.what,
|
||||
at,
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Use [`Self::data`] and [`Self::data_mut`] to access the wrapped state.
|
||||
/// Once modifications are done, use [`Self::disarm`].
|
||||
/// If [`Guard`] gets dropped instead of calling [`Self::disarm`], the state is poisoned
|
||||
/// and subsequent calls to [`Poison::check_and_arm`] will fail with an error.
|
||||
pub struct Guard<'a, T>(&'a mut Poison<T>);
|
||||
|
||||
impl<'a, T> Guard<'a, T> {
|
||||
pub fn data(&self) -> &T {
|
||||
&self.0.data
|
||||
}
|
||||
pub fn data_mut(&mut self) -> &mut T {
|
||||
&mut self.0.data
|
||||
}
|
||||
|
||||
pub fn disarm(self) {
|
||||
match self.0.state {
|
||||
State::Clean => unreachable!("we set it to Armed in check_and_arm()"),
|
||||
State::Armed => {
|
||||
self.0.state = State::Clean;
|
||||
}
|
||||
State::Poisoned { at } => {
|
||||
unreachable!("we fail check_and_arm() if it's in that state: {at}")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T> Drop for Guard<'a, T> {
|
||||
fn drop(&mut self) {
|
||||
match self.0.state {
|
||||
State::Clean => {
|
||||
// set by disarm()
|
||||
}
|
||||
State::Armed => {
|
||||
// still armed => poison it
|
||||
let at = chrono::Utc::now();
|
||||
self.0.state = State::Poisoned { at };
|
||||
warn!(at=?at, "poisoning {}", self.0.what);
|
||||
}
|
||||
State::Poisoned { at } => {
|
||||
unreachable!("we fail check_and_arm() if it's in that state: {at}")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum Error {
|
||||
#[error("poisoned at {at}: {what}")]
|
||||
Poisoned {
|
||||
what: &'static str,
|
||||
at: chrono::DateTime<chrono::Utc>,
|
||||
},
|
||||
}
|
||||
@@ -27,25 +27,25 @@
|
||||
//!
|
||||
//! # Reference Numbers
|
||||
//!
|
||||
//! 2024-04-04 on i3en.3xlarge
|
||||
//! 2024-04-10 on i3en.3xlarge
|
||||
//!
|
||||
//! ```text
|
||||
//! short/1 time: [25.925 µs 26.060 µs 26.209 µs]
|
||||
//! short/2 time: [31.277 µs 31.483 µs 31.722 µs]
|
||||
//! short/4 time: [45.496 µs 45.831 µs 46.182 µs]
|
||||
//! short/8 time: [84.298 µs 84.920 µs 85.566 µs]
|
||||
//! short/16 time: [185.04 µs 186.41 µs 187.88 µs]
|
||||
//! short/32 time: [385.01 µs 386.77 µs 388.70 µs]
|
||||
//! short/64 time: [770.24 µs 773.04 µs 776.04 µs]
|
||||
//! short/128 time: [1.5017 ms 1.5064 ms 1.5113 ms]
|
||||
//! medium/1 time: [106.65 µs 107.20 µs 107.85 µs]
|
||||
//! medium/2 time: [153.28 µs 154.24 µs 155.56 µs]
|
||||
//! medium/4 time: [325.67 µs 327.01 µs 328.71 µs]
|
||||
//! medium/8 time: [646.82 µs 650.17 µs 653.91 µs]
|
||||
//! medium/16 time: [1.2645 ms 1.2701 ms 1.2762 ms]
|
||||
//! medium/32 time: [2.4409 ms 2.4550 ms 2.4692 ms]
|
||||
//! medium/64 time: [4.6814 ms 4.7114 ms 4.7408 ms]
|
||||
//! medium/128 time: [8.7790 ms 8.9037 ms 9.0282 ms]
|
||||
//! short/1 time: [25.197 µs 25.322 µs 25.471 µs]
|
||||
//! short/2 time: [34.661 µs 34.810 µs 34.994 µs]
|
||||
//! short/4 time: [45.369 µs 45.686 µs 46.036 µs]
|
||||
//! short/8 time: [76.653 µs 77.234 µs 77.901 µs]
|
||||
//! short/16 time: [140.61 µs 142.45 µs 144.33 µs]
|
||||
//! short/32 time: [275.59 µs 277.42 µs 279.32 µs]
|
||||
//! short/64 time: [545.53 µs 547.73 µs 550.05 µs]
|
||||
//! short/128 time: [1.0531 ms 1.0606 ms 1.0693 ms]
|
||||
//! medium/1 time: [114.98 µs 115.58 µs 116.29 µs]
|
||||
//! medium/2 time: [159.51 µs 160.12 µs 160.89 µs]
|
||||
//! medium/4 time: [330.33 µs 334.07 µs 338.96 µs]
|
||||
//! medium/8 time: [660.55 µs 666.29 µs 673.20 µs]
|
||||
//! medium/16 time: [1.3094 ms 1.3210 ms 1.3347 ms]
|
||||
//! medium/32 time: [2.5856 ms 2.6100 ms 2.6387 ms]
|
||||
//! medium/64 time: [4.9600 ms 5.0052 ms 5.0545 ms]
|
||||
//! medium/128 time: [9.2382 ms 9.4334 ms 9.6216 ms]
|
||||
//! ```
|
||||
|
||||
use bytes::{Buf, Bytes};
|
||||
@@ -126,7 +126,7 @@ fn bench_impl(redo_work: Arc<Request>, n_redos: u64, nclients: u64) -> Duration
|
||||
}
|
||||
|
||||
rt.block_on(async move {
|
||||
let mut total_wallclock_time = std::time::Duration::from_millis(0);
|
||||
let mut total_wallclock_time = Duration::ZERO;
|
||||
while let Some(res) = tasks.join_next().await {
|
||||
total_wallclock_time += res.unwrap();
|
||||
}
|
||||
|
||||
@@ -3098,15 +3098,7 @@ impl Timeline {
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Option<(Lsn, Bytes)> {
|
||||
let cache = page_cache::get();
|
||||
|
||||
// FIXME: It's pointless to check the cache for things that are not 8kB pages.
|
||||
// We should look at the key to determine if it's a cacheable object
|
||||
let (lsn, read_guard) = cache
|
||||
.lookup_materialized_page(self.tenant_shard_id, self.timeline_id, key, lsn, ctx)
|
||||
.await?;
|
||||
let img = Bytes::from(read_guard.to_vec());
|
||||
Some((lsn, img))
|
||||
return None;
|
||||
}
|
||||
|
||||
async fn get_ready_ancestor_timeline(
|
||||
|
||||
@@ -239,6 +239,7 @@ impl PostgresRedoManager {
|
||||
// Relational WAL records are applied using wal-redo-postgres
|
||||
let result = proc
|
||||
.apply_wal_records(rel, blknum, &base_img, records, wal_redo_timeout)
|
||||
.await
|
||||
.context("apply_wal_records");
|
||||
|
||||
let duration = started_at.elapsed();
|
||||
|
||||
@@ -6,21 +6,18 @@ use crate::{
|
||||
};
|
||||
use anyhow::Context;
|
||||
use bytes::Bytes;
|
||||
use nix::poll::{PollFd, PollFlags};
|
||||
use pageserver_api::{reltag::RelTag, shard::TenantShardId};
|
||||
use postgres_ffi::BLCKSZ;
|
||||
use std::os::fd::AsRawFd;
|
||||
#[cfg(feature = "testing")]
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
use std::{
|
||||
collections::VecDeque,
|
||||
io::{Read, Write},
|
||||
process::{ChildStdin, ChildStdout, Command, Stdio},
|
||||
sync::{Mutex, MutexGuard},
|
||||
process::{Command, Stdio},
|
||||
time::Duration,
|
||||
};
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tracing::{debug, error, instrument, Instrument};
|
||||
use utils::{lsn::Lsn, nonblock::set_nonblock};
|
||||
use utils::{lsn::Lsn, poison::Poison};
|
||||
|
||||
mod no_leak_child;
|
||||
/// The IPC protocol that pageserver and walredo process speak over their shared pipe.
|
||||
@@ -32,20 +29,20 @@ pub struct WalRedoProcess {
|
||||
tenant_shard_id: TenantShardId,
|
||||
// Some() on construction, only becomes None on Drop.
|
||||
child: Option<NoLeakChild>,
|
||||
stdout: Mutex<ProcessOutput>,
|
||||
stdin: Mutex<ProcessInput>,
|
||||
stdout: tokio::sync::Mutex<Poison<ProcessOutput>>,
|
||||
stdin: tokio::sync::Mutex<Poison<ProcessInput>>,
|
||||
/// Counter to separate same sized walredo inputs failing at the same millisecond.
|
||||
#[cfg(feature = "testing")]
|
||||
dump_sequence: AtomicUsize,
|
||||
}
|
||||
|
||||
struct ProcessInput {
|
||||
stdin: ChildStdin,
|
||||
stdin: tokio::process::ChildStdin,
|
||||
n_requests: usize,
|
||||
}
|
||||
|
||||
struct ProcessOutput {
|
||||
stdout: ChildStdout,
|
||||
stdout: tokio::process::ChildStdout,
|
||||
pending_responses: VecDeque<Option<Bytes>>,
|
||||
n_processed_responses: usize,
|
||||
}
|
||||
@@ -100,17 +97,10 @@ impl WalRedoProcess {
|
||||
let stderr = child.stderr.take().unwrap();
|
||||
let stderr = tokio::process::ChildStderr::from_std(stderr)
|
||||
.context("convert to tokio::ChildStderr")?;
|
||||
macro_rules! set_nonblock_or_log_err {
|
||||
($file:ident) => {{
|
||||
let res = set_nonblock($file.as_raw_fd());
|
||||
if let Err(e) = &res {
|
||||
error!(error = %e, file = stringify!($file), pid = child.id(), "set_nonblock failed");
|
||||
}
|
||||
res
|
||||
}};
|
||||
}
|
||||
set_nonblock_or_log_err!(stdin)?;
|
||||
set_nonblock_or_log_err!(stdout)?;
|
||||
let stdin =
|
||||
tokio::process::ChildStdin::from_std(stdin).context("convert to tokio::ChildStdin")?;
|
||||
let stdout = tokio::process::ChildStdout::from_std(stdout)
|
||||
.context("convert to tokio::ChildStdout")?;
|
||||
|
||||
// all fallible operations post-spawn are complete, so get rid of the guard
|
||||
let child = scopeguard::ScopeGuard::into_inner(child);
|
||||
@@ -155,15 +145,21 @@ impl WalRedoProcess {
|
||||
conf,
|
||||
tenant_shard_id,
|
||||
child: Some(child),
|
||||
stdin: Mutex::new(ProcessInput {
|
||||
stdin,
|
||||
n_requests: 0,
|
||||
}),
|
||||
stdout: Mutex::new(ProcessOutput {
|
||||
stdout,
|
||||
pending_responses: VecDeque::new(),
|
||||
n_processed_responses: 0,
|
||||
}),
|
||||
stdin: tokio::sync::Mutex::new(Poison::new(
|
||||
"stdin",
|
||||
ProcessInput {
|
||||
stdin,
|
||||
n_requests: 0,
|
||||
},
|
||||
)),
|
||||
stdout: tokio::sync::Mutex::new(Poison::new(
|
||||
"stdout",
|
||||
ProcessOutput {
|
||||
stdout,
|
||||
pending_responses: VecDeque::new(),
|
||||
n_processed_responses: 0,
|
||||
},
|
||||
)),
|
||||
#[cfg(feature = "testing")]
|
||||
dump_sequence: AtomicUsize::default(),
|
||||
})
|
||||
@@ -176,11 +172,14 @@ impl WalRedoProcess {
|
||||
.id()
|
||||
}
|
||||
|
||||
// Apply given WAL records ('records') over an old page image. Returns
|
||||
// new page image.
|
||||
//
|
||||
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), pid=%self.id()))]
|
||||
pub(crate) fn apply_wal_records(
|
||||
/// Apply given WAL records ('records') over an old page image. Returns
|
||||
/// new page image.
|
||||
///
|
||||
/// # Cancel-Safety
|
||||
///
|
||||
/// Cancellation safe.
|
||||
#[instrument(skip_all, level = tracing::Level::DEBUG, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), pid=%self.id()))]
|
||||
pub(crate) async fn apply_wal_records(
|
||||
&self,
|
||||
rel: RelTag,
|
||||
blknum: u32,
|
||||
@@ -189,7 +188,6 @@ impl WalRedoProcess {
|
||||
wal_redo_timeout: Duration,
|
||||
) -> anyhow::Result<Bytes> {
|
||||
let tag = protocol::BufferTag { rel, blknum };
|
||||
let input = self.stdin.lock().unwrap();
|
||||
|
||||
// Serialize all the messages to send the WAL redo process first.
|
||||
//
|
||||
@@ -219,7 +217,11 @@ impl WalRedoProcess {
|
||||
protocol::build_get_page_msg(tag, &mut writebuf);
|
||||
WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);
|
||||
|
||||
let res = self.apply_wal_records0(&writebuf, input, wal_redo_timeout);
|
||||
let Ok(res) =
|
||||
tokio::time::timeout(wal_redo_timeout, self.apply_wal_records0(&writebuf)).await
|
||||
else {
|
||||
anyhow::bail!("WAL redo timed out");
|
||||
};
|
||||
|
||||
if res.is_err() {
|
||||
// not all of these can be caused by this particular input, however these are so rare
|
||||
@@ -230,41 +232,27 @@ impl WalRedoProcess {
|
||||
res
|
||||
}
|
||||
|
||||
fn apply_wal_records0(
|
||||
&self,
|
||||
writebuf: &[u8],
|
||||
input: MutexGuard<ProcessInput>,
|
||||
wal_redo_timeout: Duration,
|
||||
) -> anyhow::Result<Bytes> {
|
||||
let mut proc = { input }; // TODO: remove this legacy rename, but this keep the patch small.
|
||||
let mut nwrite = 0usize;
|
||||
|
||||
while nwrite < writebuf.len() {
|
||||
let mut stdin_pollfds = [PollFd::new(&proc.stdin, PollFlags::POLLOUT)];
|
||||
let n = loop {
|
||||
match nix::poll::poll(&mut stdin_pollfds[..], wal_redo_timeout.as_millis() as i32) {
|
||||
Err(nix::errno::Errno::EINTR) => continue,
|
||||
res => break res,
|
||||
}
|
||||
}?;
|
||||
|
||||
if n == 0 {
|
||||
anyhow::bail!("WAL redo timed out");
|
||||
}
|
||||
|
||||
// If 'stdin' is writeable, do write.
|
||||
let in_revents = stdin_pollfds[0].revents().unwrap();
|
||||
if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() {
|
||||
nwrite += proc.stdin.write(&writebuf[nwrite..])?;
|
||||
}
|
||||
if in_revents.contains(PollFlags::POLLHUP) {
|
||||
// We still have more data to write, but the process closed the pipe.
|
||||
anyhow::bail!("WAL redo process closed its stdin unexpectedly");
|
||||
}
|
||||
}
|
||||
let request_no = proc.n_requests;
|
||||
proc.n_requests += 1;
|
||||
drop(proc);
|
||||
/// # Cancel-Safety
|
||||
///
|
||||
/// When not polled to completion (e.g. because in `tokio::select!` another
|
||||
/// branch becomes ready before this future), concurrent and subsequent
|
||||
/// calls may fail due to [`utils::poison::Poison::check_and_arm`] calls.
|
||||
/// Dispose of this process instance and create a new one.
|
||||
async fn apply_wal_records0(&self, writebuf: &[u8]) -> anyhow::Result<Bytes> {
|
||||
let request_no = {
|
||||
let mut lock_guard = self.stdin.lock().await;
|
||||
let mut poison_guard = lock_guard.check_and_arm()?;
|
||||
let input = poison_guard.data_mut();
|
||||
input
|
||||
.stdin
|
||||
.write_all(writebuf)
|
||||
.await
|
||||
.context("write to walredo stdin")?;
|
||||
let request_no = input.n_requests;
|
||||
input.n_requests += 1;
|
||||
poison_guard.disarm();
|
||||
request_no
|
||||
};
|
||||
|
||||
// To improve walredo performance we separate sending requests and receiving
|
||||
// responses. Them are protected by different mutexes (output and input).
|
||||
@@ -278,40 +266,19 @@ impl WalRedoProcess {
|
||||
// pending responses ring buffer and truncate all empty elements from the front,
|
||||
// advancing processed responses number.
|
||||
|
||||
let mut output = self.stdout.lock().unwrap();
|
||||
let mut lock_guard = self.stdout.lock().await;
|
||||
let mut poison_guard = lock_guard.check_and_arm()?;
|
||||
let output = poison_guard.data_mut();
|
||||
let n_processed_responses = output.n_processed_responses;
|
||||
while n_processed_responses + output.pending_responses.len() <= request_no {
|
||||
// We expect the WAL redo process to respond with an 8k page image. We read it
|
||||
// into this buffer.
|
||||
let mut resultbuf = vec![0; BLCKSZ.into()];
|
||||
let mut nresult: usize = 0; // # of bytes read into 'resultbuf' so far
|
||||
while nresult < BLCKSZ.into() {
|
||||
let mut stdout_pollfds = [PollFd::new(&output.stdout, PollFlags::POLLIN)];
|
||||
// We do two things simultaneously: reading response from stdout
|
||||
// and forward any logging information that the child writes to its stderr to the page server's log.
|
||||
let n = loop {
|
||||
match nix::poll::poll(
|
||||
&mut stdout_pollfds[..],
|
||||
wal_redo_timeout.as_millis() as i32,
|
||||
) {
|
||||
Err(nix::errno::Errno::EINTR) => continue,
|
||||
res => break res,
|
||||
}
|
||||
}?;
|
||||
|
||||
if n == 0 {
|
||||
anyhow::bail!("WAL redo timed out");
|
||||
}
|
||||
|
||||
// If we have some data in stdout, read it to the result buffer.
|
||||
let out_revents = stdout_pollfds[0].revents().unwrap();
|
||||
if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
|
||||
nresult += output.stdout.read(&mut resultbuf[nresult..])?;
|
||||
}
|
||||
if out_revents.contains(PollFlags::POLLHUP) {
|
||||
anyhow::bail!("WAL redo process closed its stdout unexpectedly");
|
||||
}
|
||||
}
|
||||
output
|
||||
.stdout
|
||||
.read_exact(&mut resultbuf)
|
||||
.await
|
||||
.context("read walredo stdout")?;
|
||||
output
|
||||
.pending_responses
|
||||
.push_back(Some(Bytes::from(resultbuf)));
|
||||
@@ -359,6 +326,7 @@ impl WalRedoProcess {
|
||||
break;
|
||||
}
|
||||
}
|
||||
poison_guard.disarm();
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
@@ -378,6 +346,7 @@ impl WalRedoProcess {
|
||||
|
||||
let path = self.conf.tenant_path(&self.tenant_shard_id).join(&filename);
|
||||
|
||||
use std::io::Write;
|
||||
let res = std::fs::OpenOptions::new()
|
||||
.write(true)
|
||||
.create_new(true)
|
||||
|
||||
@@ -8,6 +8,7 @@ from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
PgBin,
|
||||
last_flush_lsn_upload,
|
||||
wait_for_last_flush_lsn,
|
||||
)
|
||||
|
||||
from performance.pageserver.util import ensure_pageserver_ready_for_benchmarking
|
||||
@@ -27,10 +28,16 @@ def test_many_small_tenants(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
pg_bin: PgBin,
|
||||
):
|
||||
_env = setup_env(neon_env_builder, 2) # vary this to the desired number of tenants
|
||||
_pg_bin = pg_bin
|
||||
|
||||
# drop into pdb so that we can debug pageserver interactively, use pdb here
|
||||
page_cache_size = int(40 * (1024**3) / 8192)
|
||||
max_file_descriptors = 500000
|
||||
neon_env_builder.pageserver_config_override = (
|
||||
f"page_cache_size={page_cache_size}; max_file_descriptors={max_file_descriptors}"
|
||||
)
|
||||
|
||||
_env = setup_env(neon_env_builder, 3000, pg_bin) # vary this to the desired number of tenants
|
||||
|
||||
# drop into pdb so that we can debug pageserver interactively, use pdb here
|
||||
# For example, to interactively examine pageserver startup behavior, call
|
||||
# _env.pageserver.stop(immediate=True)
|
||||
# _env.pageserver.start()
|
||||
@@ -41,6 +48,7 @@ def test_many_small_tenants(
|
||||
def setup_env(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
n_tenants: int,
|
||||
pg_bin: PgBin,
|
||||
) -> NeonEnv:
|
||||
def setup_template(env: NeonEnv):
|
||||
# create our template tenant
|
||||
@@ -60,12 +68,38 @@ def setup_env(
|
||||
".*Dropped remote consistent LSN updates.*",
|
||||
)
|
||||
env.pageserver.tenant_attach(template_tenant, config)
|
||||
ep = env.endpoints.create_start("main", tenant_id=template_tenant)
|
||||
ep.safe_psql("create table foo(b text)")
|
||||
for _ in range(0, 8):
|
||||
ep.safe_psql("insert into foo(b) values ('some text')")
|
||||
last_flush_lsn_upload(env, ep, template_tenant, template_timeline)
|
||||
ep.stop_and_destroy()
|
||||
ps_http = env.pageserver.http_client()
|
||||
scale = 10
|
||||
with env.endpoints.create_start("main", tenant_id=template_tenant) as ep:
|
||||
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", "-I", "dtGvp", ep.connstr()])
|
||||
wait_for_last_flush_lsn(env, ep, template_tenant, template_timeline)
|
||||
ps_http.timeline_checkpoint(template_tenant, template_timeline)
|
||||
ps_http.timeline_compact(template_tenant, template_timeline)
|
||||
for _ in range(
|
||||
0, 17
|
||||
): # some prime number to avoid potential resonances with the "_threshold" variables from the config
|
||||
# the L0s produced by this appear to have size ~5MiB
|
||||
num_txns = 10_000
|
||||
pg_bin.run_capture(
|
||||
["pgbench", "-N", "-c1", "--transactions", f"{num_txns}", ep.connstr()]
|
||||
)
|
||||
wait_for_last_flush_lsn(env, ep, template_tenant, template_timeline)
|
||||
ps_http.timeline_checkpoint(template_tenant, template_timeline)
|
||||
ps_http.timeline_compact(template_tenant, template_timeline)
|
||||
# for reference, the output at scale=6 looked like so (306M total)
|
||||
# ls -sh test_output/shared-snapshots/max_throughput_latest_lsn-2-6/snapshot/pageserver_1/tenants/35c30b88ea16a7a09f82d9c6a115551b/timelines/da902b378eebe83dc8a4e81cd3dc1c59
|
||||
# total 306M
|
||||
# 188M 000000000000000000000000000000000000-030000000000000000000000000000000003__000000000149F060-0000000009E75829
|
||||
# 4.5M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000009E75829-000000000A21E919
|
||||
# 33M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000A21E919-000000000C20CB71
|
||||
# 36M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000C20CB71-000000000E470791
|
||||
# 16M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000E470791-000000000F34AEF1
|
||||
# 8.2M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000F34AEF1-000000000FABA8A9
|
||||
# 6.0M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000FABA8A9-000000000FFE0639
|
||||
# 6.1M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000000FFE0639-000000001051D799
|
||||
# 4.7M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__000000001051D799-0000000010908F19
|
||||
# 4.6M 000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000010908F19-0000000010CD3021
|
||||
ep.stop_and_destroy()
|
||||
return (template_tenant, template_timeline, config)
|
||||
|
||||
def doit(neon_env_builder: NeonEnvBuilder) -> NeonEnv:
|
||||
|
||||
Reference in New Issue
Block a user