From 9da67c4f19f08c745e47e4f8b2b6952c4f27d9c7 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 18 Oct 2023 12:23:06 +0200 Subject: [PATCH] walredo: make request_redo() an async fn (#5559) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Stacked atop https://github.com/neondatabase/neon/pull/5557 Prep work for https://github.com/neondatabase/neon/pull/5560 These changes have a 2% impact on `bench_walredo`. That's likely because of the `block_on() in the innermost piece of benchmark-only code. So, it doesn't affect production code. The use of closures in the benchmarking code prevents a straightforward conversion of the whole benchmarking code to async. before: ``` $ cargo bench --features testing --bench bench_walredo Compiling pageserver v0.1.0 (/home/cs/src/neon/pageserver) Finished bench [optimized + debuginfo] target(s) in 2m 11s Running benches/bench_walredo.rs (target/release/deps/bench_walredo-d99a324337dead70) Gnuplot not found, using plotters backend short/short/1 time: [26.363 µs 27.451 µs 28.573 µs] Found 1 outliers among 100 measurements (1.00%) 1 (1.00%) high mild short/short/2 time: [64.340 µs 64.927 µs 65.485 µs] Found 2 outliers among 100 measurements (2.00%) 2 (2.00%) low mild short/short/4 time: [101.98 µs 104.06 µs 106.13 µs] short/short/8 time: [151.42 µs 152.74 µs 154.03 µs] short/short/16 time: [296.30 µs 297.53 µs 298.88 µs] Found 14 outliers among 100 measurements (14.00%) 10 (10.00%) high mild 4 (4.00%) high severe medium/medium/1 time: [225.12 µs 225.90 µs 226.66 µs] Found 1 outliers among 100 measurements (1.00%) 1 (1.00%) low mild medium/medium/2 time: [490.80 µs 491.64 µs 492.49 µs] Found 1 outliers among 100 measurements (1.00%) 1 (1.00%) low mild medium/medium/4 time: [934.47 µs 936.49 µs 938.52 µs] Found 5 outliers among 100 measurements (5.00%) 3 (3.00%) low mild 1 (1.00%) high mild 1 (1.00%) high severe medium/medium/8 time: [1.8364 ms 1.8412 ms 1.8463 ms] Found 4 outliers among 100 measurements (4.00%) 4 (4.00%) high mild medium/medium/16 time: [3.6694 ms 3.6896 ms 3.7104 ms] ``` after: ``` $ cargo bench --features testing --bench bench_walredo Compiling pageserver v0.1.0 (/home/cs/src/neon/pageserver) Finished bench [optimized + debuginfo] target(s) in 2m 11s Running benches/bench_walredo.rs (target/release/deps/bench_walredo-d99a324337dead70) Gnuplot not found, using plotters backend short/short/1 time: [28.345 µs 28.529 µs 28.699 µs] change: [-0.2201% +3.9276% +8.2451%] (p = 0.07 > 0.05) No change in performance detected. Found 17 outliers among 100 measurements (17.00%) 4 (4.00%) low severe 5 (5.00%) high mild 8 (8.00%) high severe short/short/2 time: [66.145 µs 66.719 µs 67.274 µs] change: [+1.5467% +2.7605% +3.9927%] (p = 0.00 < 0.05) Performance has regressed. Found 5 outliers among 100 measurements (5.00%) 5 (5.00%) low mild short/short/4 time: [105.51 µs 107.52 µs 109.49 µs] change: [+0.5023% +3.3196% +6.1986%] (p = 0.02 < 0.05) Change within noise threshold. short/short/8 time: [151.90 µs 153.16 µs 154.41 µs] change: [-1.0001% +0.2779% +1.4221%] (p = 0.65 > 0.05) No change in performance detected. short/short/16 time: [297.38 µs 298.26 µs 299.20 µs] change: [-0.2953% +0.2462% +0.7763%] (p = 0.37 > 0.05) No change in performance detected. Found 2 outliers among 100 measurements (2.00%) 2 (2.00%) high mild medium/medium/1 time: [229.76 µs 230.72 µs 231.69 µs] change: [+1.5804% +2.1354% +2.6635%] (p = 0.00 < 0.05) Performance has regressed. medium/medium/2 time: [501.14 µs 502.31 µs 503.64 µs] change: [+1.8730% +2.1709% +2.5199%] (p = 0.00 < 0.05) Performance has regressed. Found 7 outliers among 100 measurements (7.00%) 1 (1.00%) low mild 1 (1.00%) high mild 5 (5.00%) high severe medium/medium/4 time: [954.15 µs 956.74 µs 959.33 µs] change: [+1.7962% +2.1627% +2.4905%] (p = 0.00 < 0.05) Performance has regressed. medium/medium/8 time: [1.8726 ms 1.8785 ms 1.8848 ms] change: [+1.5858% +2.0240% +2.4626%] (p = 0.00 < 0.05) Performance has regressed. Found 6 outliers among 100 measurements (6.00%) 1 (1.00%) low mild 3 (3.00%) high mild 2 (2.00%) high severe medium/medium/16 time: [3.7565 ms 3.7746 ms 3.7934 ms] change: [+1.5503% +2.3044% +3.0818%] (p = 0.00 < 0.05) Performance has regressed. Found 3 outliers among 100 measurements (3.00%) 3 (3.00%) high mild ``` --- pageserver/benches/bench_walredo.rs | 68 +++++++++++++++++++--------- pageserver/src/deletion_queue.rs | 4 -- pageserver/src/tenant.rs | 70 ++++++++++++++++++++++++----- pageserver/src/tenant/timeline.rs | 6 +-- pageserver/src/walredo.rs | 37 ++++----------- 5 files changed, 116 insertions(+), 69 deletions(-) diff --git a/pageserver/benches/bench_walredo.rs b/pageserver/benches/bench_walredo.rs index 9bcd3fa708..ba41866935 100644 --- a/pageserver/benches/bench_walredo.rs +++ b/pageserver/benches/bench_walredo.rs @@ -32,9 +32,15 @@ fn redo_scenarios(c: &mut Criterion) { let manager = Arc::new(manager); - tracing::info!("executing first"); - short().execute(&manager).unwrap(); - tracing::info!("first executed"); + { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + tracing::info!("executing first"); + short().execute(rt.handle(), &manager).unwrap(); + tracing::info!("first executed"); + } let thread_counts = [1, 2, 4, 8, 16]; @@ -77,9 +83,14 @@ fn add_multithreaded_walredo_requesters( assert_ne!(threads, 0); if threads == 1 { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let handle = rt.handle(); b.iter_batched_ref( || Some(input_factory()), - |input| execute_all(input.take(), manager), + |input| execute_all(input.take(), handle, manager), criterion::BatchSize::PerIteration, ); } else { @@ -95,19 +106,26 @@ fn add_multithreaded_walredo_requesters( let manager = manager.clone(); let barrier = barrier.clone(); let work_rx = work_rx.clone(); - move || loop { - // queue up and wait if we want to go another round - if work_rx.lock().unwrap().recv().is_err() { - break; + move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let handle = rt.handle(); + loop { + // queue up and wait if we want to go another round + if work_rx.lock().unwrap().recv().is_err() { + break; + } + + let input = Some(input_factory()); + + barrier.wait(); + + execute_all(input, handle, &manager).unwrap(); + + barrier.wait(); } - - let input = Some(input_factory()); - - barrier.wait(); - - execute_all(input, &manager).unwrap(); - - barrier.wait(); } }) }) @@ -149,13 +167,17 @@ impl Drop for JoinOnDrop { } } -fn execute_all(input: I, manager: &PostgresRedoManager) -> anyhow::Result<()> +fn execute_all( + input: I, + handle: &tokio::runtime::Handle, + manager: &PostgresRedoManager, +) -> anyhow::Result<()> where I: IntoIterator, { // just fire all requests as fast as possible input.into_iter().try_for_each(|req| { - let page = req.execute(manager)?; + let page = req.execute(handle, manager)?; assert_eq!(page.remaining(), 8192); anyhow::Ok(()) }) @@ -470,9 +492,11 @@ struct Request { } impl Request { - fn execute(self, manager: &PostgresRedoManager) -> anyhow::Result { - use pageserver::walredo::WalRedoManager; - + fn execute( + self, + rt: &tokio::runtime::Handle, + manager: &PostgresRedoManager, + ) -> anyhow::Result { let Request { key, lsn, @@ -481,6 +505,6 @@ impl Request { pg_version, } = self; - manager.request_redo(key, lsn, base_img, records, pg_version) + rt.block_on(manager.request_redo(key, lsn, base_img, records, pg_version)) } } diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index 0bf851a8d7..22efa23f10 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -1298,10 +1298,6 @@ pub(crate) mod mock { } } - pub fn get_executed(&self) -> usize { - self.executed.load(Ordering::Relaxed) - } - #[allow(clippy::await_holding_lock)] pub async fn pump(&self) { if let Some(remote_storage) = &self.remote_storage { diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 0d987cfbfb..4dae183aea 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -83,7 +83,6 @@ use crate::tenant::timeline::delete::DeleteTimelineFlow; use crate::tenant::timeline::uninit::cleanup_timeline_directory; use crate::virtual_file::VirtualFile; use crate::walredo::PostgresRedoManager; -use crate::walredo::WalRedoManager; use crate::TEMP_FILE_SUFFIX; pub use pageserver_api::models::TenantState; @@ -230,7 +229,7 @@ pub struct Tenant { // with timelines, which in turn may cause dropping replication connection, expiration of wait_for_lsn // timeout... gc_cs: tokio::sync::Mutex<()>, - walredo_mgr: Arc, + walredo_mgr: Arc, // provides access to timeline data sitting in the remote storage pub(crate) remote_storage: Option, @@ -247,6 +246,48 @@ pub struct Tenant { pub(crate) delete_progress: Arc>, } +pub(crate) enum WalRedoManager { + Prod(PostgresRedoManager), + #[cfg(test)] + Test(harness::TestRedoManager), +} + +impl From for WalRedoManager { + fn from(mgr: PostgresRedoManager) -> Self { + Self::Prod(mgr) + } +} + +#[cfg(test)] +impl From for WalRedoManager { + fn from(mgr: harness::TestRedoManager) -> Self { + Self::Test(mgr) + } +} + +impl WalRedoManager { + pub async fn request_redo( + &self, + key: crate::repository::Key, + lsn: Lsn, + base_img: Option<(Lsn, bytes::Bytes)>, + records: Vec<(Lsn, crate::walrecord::NeonWalRecord)>, + pg_version: u32, + ) -> anyhow::Result { + match self { + Self::Prod(mgr) => { + mgr.request_redo(key, lsn, base_img, records, pg_version) + .await + } + #[cfg(test)] + Self::Test(mgr) => { + mgr.request_redo(key, lsn, base_img, records, pg_version) + .await + } + } + } +} + #[derive(Debug, thiserror::Error, PartialEq, Eq)] pub enum GetTimelineError { #[error("Timeline {tenant_id}/{timeline_id} is not active, state: {state:?}")] @@ -488,7 +529,9 @@ impl Tenant { ctx: &RequestContext, ) -> anyhow::Result> { // TODO dedup with spawn_load - let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenant_id)); + let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new( + conf, tenant_id, + ))); let TenantSharedResources { broker_client, @@ -815,7 +858,9 @@ impl Tenant { tenant_id: TenantId, reason: String, ) -> Arc { - let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenant_id)); + let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new( + conf, tenant_id, + ))); Arc::new(Tenant::new( TenantState::Broken { reason, @@ -854,7 +899,9 @@ impl Tenant { let broker_client = resources.broker_client; let remote_storage = resources.remote_storage; - let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenant_id)); + let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new( + conf, tenant_id, + ))); let tenant = Tenant::new( TenantState::Loading, conf, @@ -2419,7 +2466,7 @@ impl Tenant { state: TenantState, conf: &'static PageServerConf, attached_conf: AttachedTenantConf, - walredo_mgr: Arc, + walredo_mgr: Arc, tenant_id: TenantId, remote_storage: Option, deletion_queue_client: DeletionQueueClient, @@ -3550,7 +3597,7 @@ pub async fn dump_layerfile_from_path( } #[cfg(test)] -pub mod harness { +pub(crate) mod harness { use bytes::{Bytes, BytesMut}; use once_cell::sync::OnceCell; use std::fs; @@ -3561,7 +3608,6 @@ pub mod harness { use crate::deletion_queue::mock::MockDeletionQueue; use crate::{ config::PageServerConf, repository::Key, tenant::Tenant, walrecord::NeonWalRecord, - walredo::WalRedoManager, }; use super::*; @@ -3690,7 +3736,7 @@ pub mod harness { } pub async fn try_load(&self, ctx: &RequestContext) -> anyhow::Result> { - let walredo_mgr = Arc::new(TestRedoManager); + let walredo_mgr = Arc::new(WalRedoManager::from(TestRedoManager)); let tenant = Arc::new(Tenant::new( TenantState::Loading, @@ -3724,10 +3770,10 @@ pub mod harness { } // Mock WAL redo manager that doesn't do much - pub struct TestRedoManager; + pub(crate) struct TestRedoManager; - impl WalRedoManager for TestRedoManager { - fn request_redo( + impl TestRedoManager { + pub async fn request_redo( &self, key: Key, lsn: Lsn, diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 250047823e..2c76155e2a 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -81,7 +81,6 @@ use crate::repository::GcResult; use crate::repository::{Key, Value}; use crate::task_mgr; use crate::task_mgr::TaskKind; -use crate::walredo::WalRedoManager; use crate::ZERO_PAGE; use self::delete::DeleteTimelineFlow; @@ -201,7 +200,7 @@ pub struct Timeline { last_freeze_ts: RwLock, // WAL redo manager - walredo_mgr: Arc, + walredo_mgr: Arc, /// Remote storage client. /// See [`remote_timeline_client`](super::remote_timeline_client) module comment for details. @@ -1471,7 +1470,7 @@ impl Timeline { timeline_id: TimelineId, tenant_id: TenantId, generation: Generation, - walredo_mgr: Arc, + walredo_mgr: Arc, resources: TimelineResources, pg_version: u32, initial_logical_size_can_start: Option, @@ -4324,6 +4323,7 @@ impl Timeline { let img = match self .walredo_mgr .request_redo(key, request_lsn, data.img, data.records, self.pg_version) + .await .context("Failed to reconstruct a page image:") { Ok(img) => img, diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 7056ef4f90..7e61a1dc37 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -70,27 +70,6 @@ pub(crate) struct BufferTag { pub blknum: u32, } -/// -/// WAL Redo Manager is responsible for replaying WAL records. -/// -/// Callers use the WAL redo manager through this abstract interface, -/// which makes it easy to mock it in tests. -pub trait WalRedoManager: Send + Sync { - /// Apply some WAL records. - /// - /// The caller passes an old page image, and WAL records that should be - /// applied over it. The return value is a new page image, after applying - /// the reords. - fn request_redo( - &self, - key: Key, - lsn: Lsn, - base_img: Option<(Lsn, Bytes)>, - records: Vec<(Lsn, NeonWalRecord)>, - pg_version: u32, - ) -> anyhow::Result; -} - struct ProcessInput { stdin: ChildStdin, stderr_fd: RawFd, @@ -135,14 +114,14 @@ fn can_apply_in_neon(rec: &NeonWalRecord) -> bool { /// /// Public interface of WAL redo manager /// -impl WalRedoManager for PostgresRedoManager { +impl PostgresRedoManager { /// /// Request the WAL redo manager to apply some WAL records /// /// The WAL redo is handled by a separate thread, so this just sends a request /// to the thread and waits for response. /// - fn request_redo( + pub async fn request_redo( &self, key: Key, lsn: Lsn, @@ -1156,15 +1135,15 @@ fn build_get_page_msg(tag: BufferTag, buf: &mut Vec) { #[cfg(test)] mod tests { - use super::{PostgresRedoManager, WalRedoManager}; + use super::PostgresRedoManager; use crate::repository::Key; use crate::{config::PageServerConf, walrecord::NeonWalRecord}; use bytes::Bytes; use std::str::FromStr; use utils::{id::TenantId, lsn::Lsn}; - #[test] - fn short_v14_redo() { + #[tokio::test] + async fn short_v14_redo() { let expected = std::fs::read("fixtures/short_v14_redo.page").unwrap(); let h = RedoHarness::new().unwrap(); @@ -1185,13 +1164,14 @@ mod tests { short_records(), 14, ) + .await .unwrap(); assert_eq!(&expected, &*page); } - #[test] - fn short_v14_fails_for_wrong_key_but_returns_zero_page() { + #[tokio::test] + async fn short_v14_fails_for_wrong_key_but_returns_zero_page() { let h = RedoHarness::new().unwrap(); let page = h @@ -1211,6 +1191,7 @@ mod tests { short_records(), 14, ) + .await .unwrap(); // TODO: there will be some stderr printout, which is forwarded to tracing that could