From 26206e8d8af7206e15604c818f46d10fbc8a7401 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 16 Aug 2024 11:40:16 +0200 Subject: [PATCH] option for concurrent IO --- pageserver/src/tenant/timeline/compaction.rs | 59 +++++++++++++++++--- 1 file changed, 50 insertions(+), 9 deletions(-) diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 7370ec1386..13199534be 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -855,11 +855,20 @@ impl Timeline { merge_iter: MergeIterator<'a>, }, ValidatingStreamingKmergeBypassingPageCache { - mode: CompactL0BypassPageCacheValidation, + what: ValidationWhat, + concurrency: ValidationIoConcurrency, merge_iter: MergeIterator<'a>, all_keys_iter: VecIter<'a>, }, } + enum ValidationIoConcurrency { + Sequential, + Concurrent, + } + enum ValidationWhat { + KeyLsn, + KeyLsnValue, + } type VecIter<'a> = std::slice::Iter<'a, DeltaEntry<'a>>; // TODO: distinguished lifetimes impl AllValuesIter<'_> { async fn next_all_keys_iter( @@ -887,10 +896,18 @@ impl Timeline { Self::next_all_keys_iter(iter, ctx).await } AllValuesIter::StreamingKmergeBypassingPageCache { merge_iter } => merge_iter.next().await, - AllValuesIter::ValidatingStreamingKmergeBypassingPageCache { mode, merge_iter, all_keys_iter } => async { - // advance both iterators - let all_keys_iter_item = Self::next_all_keys_iter(all_keys_iter, ctx).await; - let merge_iter_item = merge_iter.next().await; + AllValuesIter::ValidatingStreamingKmergeBypassingPageCache { what, concurrency, merge_iter, all_keys_iter } => async { + // advance both iterators. Use concurrency but no parallelism. + let all_keys_iter_item_fut = Self::next_all_keys_iter(all_keys_iter, ctx); + let merge_iter_item_fut = merge_iter.next(); + let (all_keys_iter_item, merge_iter_item) = match concurrency { + ValidationIoConcurrency::Sequential => { + (all_keys_iter_item_fut.await, merge_iter_item_fut.await) + }, + ValidationIoConcurrency::Concurrent => { + futures::future::join(all_keys_iter_item_fut, merge_iter_item_fut).await + }, + }; // compare results & log warnings as needed macro_rules! rate_limited_warn { ($($arg:tt)*) => {{ @@ -928,16 +945,16 @@ impl Timeline { rate_limited_warn!(?merge, "all_keys_iter returned None where merge returned Some"); } (Ok(Some((all_keys_key, all_keys_lsn, all_keys_value))), Ok(Some((merge_key, merge_lsn, merge_value)))) => { - match mode { + match what { // TODO: in this mode, we still load the value from disk for both iterators, even though we only need the all_keys_iter one - CompactL0BypassPageCacheValidation::KeyLsn => { + ValidationWhat::KeyLsn => { let all_keys = (all_keys_key, all_keys_lsn); let merge = (merge_key, merge_lsn); if all_keys != merge { rate_limited_warn!(?all_keys, ?merge, "merge returned a different (Key,LSN) than all_keys_iter"); } } - CompactL0BypassPageCacheValidation::KeyLsnValue => { + ValidationWhat::KeyLsnValue => { let all_keys = (all_keys_key, all_keys_lsn, all_keys_value); let merge = (merge_key, merge_lsn, merge_value); if all_keys != merge { @@ -969,7 +986,26 @@ impl Timeline { match validate { None => AllValuesIter::StreamingKmergeBypassingPageCache { merge_iter }, Some(validate) => AllValuesIter::ValidatingStreamingKmergeBypassingPageCache { - mode: validate.clone(), + what: match &validate { + CompactL0BypassPageCacheValidation::KeyLsn + | CompactL0BypassPageCacheValidation::KeyLsnConcurrentIo => { + ValidationWhat::KeyLsn + } + CompactL0BypassPageCacheValidation::KeyLsnValue + | CompactL0BypassPageCacheValidation::KeyLsnValueConcurrentIo => { + ValidationWhat::KeyLsnValue + } + }, + concurrency: match validate { + CompactL0BypassPageCacheValidation::KeyLsnConcurrentIo + | CompactL0BypassPageCacheValidation::KeyLsnValueConcurrentIo => { + ValidationIoConcurrency::Concurrent + } + CompactL0BypassPageCacheValidation::KeyLsn + | CompactL0BypassPageCacheValidation::KeyLsnValue => { + ValidationIoConcurrency::Sequential + } + }, merge_iter, all_keys_iter: all_keys.iter(), }, @@ -1389,11 +1425,16 @@ pub enum CompactL0Phase1ValueAccess { /// See [`CompactL0Phase1ValueAccess::StreamingKmerge`]. #[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)] #[serde(rename_all = "kebab-case")] +#[allow(clippy::enum_variant_names)] pub enum CompactL0BypassPageCacheValidation { /// Validate that the series of (key, lsn) pairs are the same. KeyLsn, + // Like [`KeyLsn`], but perform the IO concurrently. + KeyLsnConcurrentIo, /// Validate that the entire output of old and new way is identical. KeyLsnValue, + // Like [`KeyLsnValue`], but perform the IO concurrently. + KeyLsnValueConcurrentIo, } impl Default for CompactL0Phase1ValueAccess {