walredo: make request_redo() an async fn (#5559)

Stacked atop https://github.com/neondatabase/neon/pull/5557
Prep work for https://github.com/neondatabase/neon/pull/5560

These changes have a 2% impact on `bench_walredo`.
That's likely because of the `block_on() in the innermost piece of
benchmark-only code.
So, it doesn't affect production code.
The use of closures in the benchmarking code prevents a straightforward
conversion of the whole benchmarking code to async.

before:

```
    $ cargo bench --features testing --bench bench_walredo
       Compiling pageserver v0.1.0 (/home/cs/src/neon/pageserver)
        Finished bench [optimized + debuginfo] target(s) in 2m 11s
         Running benches/bench_walredo.rs (target/release/deps/bench_walredo-d99a324337dead70)
    Gnuplot not found, using plotters backend
    short/short/1           time:   [26.363 µs 27.451 µs 28.573 µs]
    Found 1 outliers among 100 measurements (1.00%)
      1 (1.00%) high mild
    short/short/2           time:   [64.340 µs 64.927 µs 65.485 µs]
    Found 2 outliers among 100 measurements (2.00%)
      2 (2.00%) low mild
    short/short/4           time:   [101.98 µs 104.06 µs 106.13 µs]
    short/short/8           time:   [151.42 µs 152.74 µs 154.03 µs]
    short/short/16          time:   [296.30 µs 297.53 µs 298.88 µs]
    Found 14 outliers among 100 measurements (14.00%)
      10 (10.00%) high mild
      4 (4.00%) high severe

    medium/medium/1         time:   [225.12 µs 225.90 µs 226.66 µs]
    Found 1 outliers among 100 measurements (1.00%)
      1 (1.00%) low mild
    medium/medium/2         time:   [490.80 µs 491.64 µs 492.49 µs]
    Found 1 outliers among 100 measurements (1.00%)
      1 (1.00%) low mild
    medium/medium/4         time:   [934.47 µs 936.49 µs 938.52 µs]
    Found 5 outliers among 100 measurements (5.00%)
      3 (3.00%) low mild
      1 (1.00%) high mild
      1 (1.00%) high severe
    medium/medium/8         time:   [1.8364 ms 1.8412 ms 1.8463 ms]
    Found 4 outliers among 100 measurements (4.00%)
      4 (4.00%) high mild
    medium/medium/16        time:   [3.6694 ms 3.6896 ms 3.7104 ms]

```

after:

```

    $ cargo bench --features testing --bench bench_walredo
       Compiling pageserver v0.1.0 (/home/cs/src/neon/pageserver)
        Finished bench [optimized + debuginfo] target(s) in 2m 11s
         Running benches/bench_walredo.rs (target/release/deps/bench_walredo-d99a324337dead70)
    Gnuplot not found, using plotters backend
    short/short/1           time:   [28.345 µs 28.529 µs 28.699 µs]
                            change: [-0.2201% +3.9276% +8.2451%] (p = 0.07 > 0.05)
                            No change in performance detected.
    Found 17 outliers among 100 measurements (17.00%)
      4 (4.00%) low severe
      5 (5.00%) high mild
      8 (8.00%) high severe
    short/short/2           time:   [66.145 µs 66.719 µs 67.274 µs]
                            change: [+1.5467% +2.7605% +3.9927%] (p = 0.00 < 0.05)
                            Performance has regressed.
    Found 5 outliers among 100 measurements (5.00%)
      5 (5.00%) low mild
    short/short/4           time:   [105.51 µs 107.52 µs 109.49 µs]
                            change: [+0.5023% +3.3196% +6.1986%] (p = 0.02 < 0.05)
                            Change within noise threshold.
    short/short/8           time:   [151.90 µs 153.16 µs 154.41 µs]
                            change: [-1.0001% +0.2779% +1.4221%] (p = 0.65 > 0.05)
                            No change in performance detected.
    short/short/16          time:   [297.38 µs 298.26 µs 299.20 µs]
                            change: [-0.2953% +0.2462% +0.7763%] (p = 0.37 > 0.05)
                            No change in performance detected.
    Found 2 outliers among 100 measurements (2.00%)
      2 (2.00%) high mild

    medium/medium/1         time:   [229.76 µs 230.72 µs 231.69 µs]
                            change: [+1.5804% +2.1354% +2.6635%] (p = 0.00 < 0.05)
                            Performance has regressed.
    medium/medium/2         time:   [501.14 µs 502.31 µs 503.64 µs]
                            change: [+1.8730% +2.1709% +2.5199%] (p = 0.00 < 0.05)
                            Performance has regressed.
    Found 7 outliers among 100 measurements (7.00%)
      1 (1.00%) low mild
      1 (1.00%) high mild
      5 (5.00%) high severe
    medium/medium/4         time:   [954.15 µs 956.74 µs 959.33 µs]
                            change: [+1.7962% +2.1627% +2.4905%] (p = 0.00 < 0.05)
                            Performance has regressed.
    medium/medium/8         time:   [1.8726 ms 1.8785 ms 1.8848 ms]
                            change: [+1.5858% +2.0240% +2.4626%] (p = 0.00 < 0.05)
                            Performance has regressed.
    Found 6 outliers among 100 measurements (6.00%)
      1 (1.00%) low mild
      3 (3.00%) high mild
      2 (2.00%) high severe
    medium/medium/16        time:   [3.7565 ms 3.7746 ms 3.7934 ms]
                            change: [+1.5503% +2.3044% +3.0818%] (p = 0.00 < 0.05)
                            Performance has regressed.
    Found 3 outliers among 100 measurements (3.00%)
      3 (3.00%) high mild
```
This commit is contained in:
Christian Schwarz
2023-10-18 12:23:06 +02:00
committed by GitHub
parent 16c87b5bda
commit 9da67c4f19
5 changed files with 116 additions and 69 deletions

View File

@@ -32,9 +32,15 @@ fn redo_scenarios(c: &mut Criterion) {
let manager = Arc::new(manager);
tracing::info!("executing first");
short().execute(&manager).unwrap();
tracing::info!("first executed");
{
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
tracing::info!("executing first");
short().execute(rt.handle(), &manager).unwrap();
tracing::info!("first executed");
}
let thread_counts = [1, 2, 4, 8, 16];
@@ -77,9 +83,14 @@ fn add_multithreaded_walredo_requesters(
assert_ne!(threads, 0);
if threads == 1 {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let handle = rt.handle();
b.iter_batched_ref(
|| Some(input_factory()),
|input| execute_all(input.take(), manager),
|input| execute_all(input.take(), handle, manager),
criterion::BatchSize::PerIteration,
);
} else {
@@ -95,19 +106,26 @@ fn add_multithreaded_walredo_requesters(
let manager = manager.clone();
let barrier = barrier.clone();
let work_rx = work_rx.clone();
move || loop {
// queue up and wait if we want to go another round
if work_rx.lock().unwrap().recv().is_err() {
break;
move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let handle = rt.handle();
loop {
// queue up and wait if we want to go another round
if work_rx.lock().unwrap().recv().is_err() {
break;
}
let input = Some(input_factory());
barrier.wait();
execute_all(input, handle, &manager).unwrap();
barrier.wait();
}
let input = Some(input_factory());
barrier.wait();
execute_all(input, &manager).unwrap();
barrier.wait();
}
})
})
@@ -149,13 +167,17 @@ impl Drop for JoinOnDrop {
}
}
fn execute_all<I>(input: I, manager: &PostgresRedoManager) -> anyhow::Result<()>
fn execute_all<I>(
input: I,
handle: &tokio::runtime::Handle,
manager: &PostgresRedoManager,
) -> anyhow::Result<()>
where
I: IntoIterator<Item = Request>,
{
// just fire all requests as fast as possible
input.into_iter().try_for_each(|req| {
let page = req.execute(manager)?;
let page = req.execute(handle, manager)?;
assert_eq!(page.remaining(), 8192);
anyhow::Ok(())
})
@@ -470,9 +492,11 @@ struct Request {
}
impl Request {
fn execute(self, manager: &PostgresRedoManager) -> anyhow::Result<Bytes> {
use pageserver::walredo::WalRedoManager;
fn execute(
self,
rt: &tokio::runtime::Handle,
manager: &PostgresRedoManager,
) -> anyhow::Result<Bytes> {
let Request {
key,
lsn,
@@ -481,6 +505,6 @@ impl Request {
pg_version,
} = self;
manager.request_redo(key, lsn, base_img, records, pg_version)
rt.block_on(manager.request_redo(key, lsn, base_img, records, pg_version))
}
}

View File

@@ -1298,10 +1298,6 @@ pub(crate) mod mock {
}
}
pub fn get_executed(&self) -> usize {
self.executed.load(Ordering::Relaxed)
}
#[allow(clippy::await_holding_lock)]
pub async fn pump(&self) {
if let Some(remote_storage) = &self.remote_storage {

View File

@@ -83,7 +83,6 @@ use crate::tenant::timeline::delete::DeleteTimelineFlow;
use crate::tenant::timeline::uninit::cleanup_timeline_directory;
use crate::virtual_file::VirtualFile;
use crate::walredo::PostgresRedoManager;
use crate::walredo::WalRedoManager;
use crate::TEMP_FILE_SUFFIX;
pub use pageserver_api::models::TenantState;
@@ -230,7 +229,7 @@ pub struct Tenant {
// with timelines, which in turn may cause dropping replication connection, expiration of wait_for_lsn
// timeout...
gc_cs: tokio::sync::Mutex<()>,
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
walredo_mgr: Arc<WalRedoManager>,
// provides access to timeline data sitting in the remote storage
pub(crate) remote_storage: Option<GenericRemoteStorage>,
@@ -247,6 +246,48 @@ pub struct Tenant {
pub(crate) delete_progress: Arc<tokio::sync::Mutex<DeleteTenantFlow>>,
}
pub(crate) enum WalRedoManager {
Prod(PostgresRedoManager),
#[cfg(test)]
Test(harness::TestRedoManager),
}
impl From<PostgresRedoManager> for WalRedoManager {
fn from(mgr: PostgresRedoManager) -> Self {
Self::Prod(mgr)
}
}
#[cfg(test)]
impl From<harness::TestRedoManager> for WalRedoManager {
fn from(mgr: harness::TestRedoManager) -> Self {
Self::Test(mgr)
}
}
impl WalRedoManager {
pub async fn request_redo(
&self,
key: crate::repository::Key,
lsn: Lsn,
base_img: Option<(Lsn, bytes::Bytes)>,
records: Vec<(Lsn, crate::walrecord::NeonWalRecord)>,
pg_version: u32,
) -> anyhow::Result<bytes::Bytes> {
match self {
Self::Prod(mgr) => {
mgr.request_redo(key, lsn, base_img, records, pg_version)
.await
}
#[cfg(test)]
Self::Test(mgr) => {
mgr.request_redo(key, lsn, base_img, records, pg_version)
.await
}
}
}
}
#[derive(Debug, thiserror::Error, PartialEq, Eq)]
pub enum GetTimelineError {
#[error("Timeline {tenant_id}/{timeline_id} is not active, state: {state:?}")]
@@ -488,7 +529,9 @@ impl Tenant {
ctx: &RequestContext,
) -> anyhow::Result<Arc<Tenant>> {
// TODO dedup with spawn_load
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenant_id));
let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new(
conf, tenant_id,
)));
let TenantSharedResources {
broker_client,
@@ -815,7 +858,9 @@ impl Tenant {
tenant_id: TenantId,
reason: String,
) -> Arc<Tenant> {
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenant_id));
let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new(
conf, tenant_id,
)));
Arc::new(Tenant::new(
TenantState::Broken {
reason,
@@ -854,7 +899,9 @@ impl Tenant {
let broker_client = resources.broker_client;
let remote_storage = resources.remote_storage;
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenant_id));
let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new(
conf, tenant_id,
)));
let tenant = Tenant::new(
TenantState::Loading,
conf,
@@ -2419,7 +2466,7 @@ impl Tenant {
state: TenantState,
conf: &'static PageServerConf,
attached_conf: AttachedTenantConf,
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
walredo_mgr: Arc<WalRedoManager>,
tenant_id: TenantId,
remote_storage: Option<GenericRemoteStorage>,
deletion_queue_client: DeletionQueueClient,
@@ -3550,7 +3597,7 @@ pub async fn dump_layerfile_from_path(
}
#[cfg(test)]
pub mod harness {
pub(crate) mod harness {
use bytes::{Bytes, BytesMut};
use once_cell::sync::OnceCell;
use std::fs;
@@ -3561,7 +3608,6 @@ pub mod harness {
use crate::deletion_queue::mock::MockDeletionQueue;
use crate::{
config::PageServerConf, repository::Key, tenant::Tenant, walrecord::NeonWalRecord,
walredo::WalRedoManager,
};
use super::*;
@@ -3690,7 +3736,7 @@ pub mod harness {
}
pub async fn try_load(&self, ctx: &RequestContext) -> anyhow::Result<Arc<Tenant>> {
let walredo_mgr = Arc::new(TestRedoManager);
let walredo_mgr = Arc::new(WalRedoManager::from(TestRedoManager));
let tenant = Arc::new(Tenant::new(
TenantState::Loading,
@@ -3724,10 +3770,10 @@ pub mod harness {
}
// Mock WAL redo manager that doesn't do much
pub struct TestRedoManager;
pub(crate) struct TestRedoManager;
impl WalRedoManager for TestRedoManager {
fn request_redo(
impl TestRedoManager {
pub async fn request_redo(
&self,
key: Key,
lsn: Lsn,

View File

@@ -81,7 +81,6 @@ use crate::repository::GcResult;
use crate::repository::{Key, Value};
use crate::task_mgr;
use crate::task_mgr::TaskKind;
use crate::walredo::WalRedoManager;
use crate::ZERO_PAGE;
use self::delete::DeleteTimelineFlow;
@@ -201,7 +200,7 @@ pub struct Timeline {
last_freeze_ts: RwLock<Instant>,
// WAL redo manager
walredo_mgr: Arc<dyn WalRedoManager + Sync + Send>,
walredo_mgr: Arc<super::WalRedoManager>,
/// Remote storage client.
/// See [`remote_timeline_client`](super::remote_timeline_client) module comment for details.
@@ -1471,7 +1470,7 @@ impl Timeline {
timeline_id: TimelineId,
tenant_id: TenantId,
generation: Generation,
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
walredo_mgr: Arc<super::WalRedoManager>,
resources: TimelineResources,
pg_version: u32,
initial_logical_size_can_start: Option<completion::Barrier>,
@@ -4324,6 +4323,7 @@ impl Timeline {
let img = match self
.walredo_mgr
.request_redo(key, request_lsn, data.img, data.records, self.pg_version)
.await
.context("Failed to reconstruct a page image:")
{
Ok(img) => img,

View File

@@ -70,27 +70,6 @@ pub(crate) struct BufferTag {
pub blknum: u32,
}
///
/// WAL Redo Manager is responsible for replaying WAL records.
///
/// Callers use the WAL redo manager through this abstract interface,
/// which makes it easy to mock it in tests.
pub trait WalRedoManager: Send + Sync {
/// Apply some WAL records.
///
/// The caller passes an old page image, and WAL records that should be
/// applied over it. The return value is a new page image, after applying
/// the reords.
fn request_redo(
&self,
key: Key,
lsn: Lsn,
base_img: Option<(Lsn, Bytes)>,
records: Vec<(Lsn, NeonWalRecord)>,
pg_version: u32,
) -> anyhow::Result<Bytes>;
}
struct ProcessInput {
stdin: ChildStdin,
stderr_fd: RawFd,
@@ -135,14 +114,14 @@ fn can_apply_in_neon(rec: &NeonWalRecord) -> bool {
///
/// Public interface of WAL redo manager
///
impl WalRedoManager for PostgresRedoManager {
impl PostgresRedoManager {
///
/// Request the WAL redo manager to apply some WAL records
///
/// The WAL redo is handled by a separate thread, so this just sends a request
/// to the thread and waits for response.
///
fn request_redo(
pub async fn request_redo(
&self,
key: Key,
lsn: Lsn,
@@ -1156,15 +1135,15 @@ fn build_get_page_msg(tag: BufferTag, buf: &mut Vec<u8>) {
#[cfg(test)]
mod tests {
use super::{PostgresRedoManager, WalRedoManager};
use super::PostgresRedoManager;
use crate::repository::Key;
use crate::{config::PageServerConf, walrecord::NeonWalRecord};
use bytes::Bytes;
use std::str::FromStr;
use utils::{id::TenantId, lsn::Lsn};
#[test]
fn short_v14_redo() {
#[tokio::test]
async fn short_v14_redo() {
let expected = std::fs::read("fixtures/short_v14_redo.page").unwrap();
let h = RedoHarness::new().unwrap();
@@ -1185,13 +1164,14 @@ mod tests {
short_records(),
14,
)
.await
.unwrap();
assert_eq!(&expected, &*page);
}
#[test]
fn short_v14_fails_for_wrong_key_but_returns_zero_page() {
#[tokio::test]
async fn short_v14_fails_for_wrong_key_but_returns_zero_page() {
let h = RedoHarness::new().unwrap();
let page = h
@@ -1211,6 +1191,7 @@ mod tests {
short_records(),
14,
)
.await
.unwrap();
// TODO: there will be some stderr printout, which is forwarded to tracing that could