From 58b857dff290cd5d6cc8a07053da4fac3b120478 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 16 Aug 2024 11:56:09 +0200 Subject: [PATCH] WIP: parallel mode --- pageserver/src/tenant/timeline/compaction.rs | 30 ++++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 13199534be..75ab593736 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -864,6 +864,7 @@ impl Timeline { enum ValidationIoConcurrency { Sequential, Concurrent, + Parallel, } enum ValidationWhat { KeyLsn, @@ -898,8 +899,8 @@ impl Timeline { AllValuesIter::StreamingKmergeBypassingPageCache { merge_iter } => merge_iter.next().await, AllValuesIter::ValidatingStreamingKmergeBypassingPageCache { what, concurrency, merge_iter, all_keys_iter } => async { // advance both iterators. Use concurrency but no parallelism. - let all_keys_iter_item_fut = Self::next_all_keys_iter(all_keys_iter, ctx); - let merge_iter_item_fut = merge_iter.next(); + let mut all_keys_iter_item_fut = Self::next_all_keys_iter(all_keys_iter, ctx); + let mut merge_iter_item_fut = merge_iter.next(); let (all_keys_iter_item, merge_iter_item) = match concurrency { ValidationIoConcurrency::Sequential => { (all_keys_iter_item_fut.await, merge_iter_item_fut.await) @@ -907,6 +908,31 @@ impl Timeline { ValidationIoConcurrency::Concurrent => { futures::future::join(all_keys_iter_item_fut, merge_iter_item_fut).await }, + ValidationIoConcurrency::Parallel => { + use std::task::Poll::*; + let all_keys_iter_item_fut = std::pin::pin!(all_keys_iter_item_fut); + let merge_iter_item_fut = std::pin::pin!(merge_iter_item_fut); + match (futures::poll!(&mut all_keys_iter_item_fut), futures::poll!(&mut merge_iter_item_fut)) { + (Ready(a),Ready(b)) => (a, b), + (Ready(a),Pending) => { + let b = merge_iter_item_fut.await; + (a, b) + }, + (Pending,Ready(b)) => { + let a = all_keys_iter_item_fut.await; + (a, b) + } + (Pending,Pending) => { + let a = tokio::spawn(all_keys_iter_item_fut); + let b = tokio::spawn(merge_iter_item_fut); + let (a, b) = futures::future::join(a, b).await; + ( + a.map_err(anyhow::Error::new).and_then(|r| r), + b.map_err(anyhow::Error::new).and_then(|r| r), + ) + }, + } + }, }; // compare results & log warnings as needed macro_rules! rate_limited_warn {