mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-30 19:40:39 +00:00
option for concurrent IO
This commit is contained in:
@@ -855,11 +855,20 @@ impl Timeline {
|
||||
merge_iter: MergeIterator<'a>,
|
||||
},
|
||||
ValidatingStreamingKmergeBypassingPageCache {
|
||||
mode: CompactL0BypassPageCacheValidation,
|
||||
what: ValidationWhat,
|
||||
concurrency: ValidationIoConcurrency,
|
||||
merge_iter: MergeIterator<'a>,
|
||||
all_keys_iter: VecIter<'a>,
|
||||
},
|
||||
}
|
||||
enum ValidationIoConcurrency {
|
||||
Sequential,
|
||||
Concurrent,
|
||||
}
|
||||
enum ValidationWhat {
|
||||
KeyLsn,
|
||||
KeyLsnValue,
|
||||
}
|
||||
type VecIter<'a> = std::slice::Iter<'a, DeltaEntry<'a>>; // TODO: distinguished lifetimes
|
||||
impl AllValuesIter<'_> {
|
||||
async fn next_all_keys_iter(
|
||||
@@ -887,10 +896,18 @@ impl Timeline {
|
||||
Self::next_all_keys_iter(iter, ctx).await
|
||||
}
|
||||
AllValuesIter::StreamingKmergeBypassingPageCache { merge_iter } => merge_iter.next().await,
|
||||
AllValuesIter::ValidatingStreamingKmergeBypassingPageCache { mode, merge_iter, all_keys_iter } => async {
|
||||
// advance both iterators
|
||||
let all_keys_iter_item = Self::next_all_keys_iter(all_keys_iter, ctx).await;
|
||||
let merge_iter_item = merge_iter.next().await;
|
||||
AllValuesIter::ValidatingStreamingKmergeBypassingPageCache { what, concurrency, merge_iter, all_keys_iter } => async {
|
||||
// advance both iterators. Use concurrency but no parallelism.
|
||||
let all_keys_iter_item_fut = Self::next_all_keys_iter(all_keys_iter, ctx);
|
||||
let merge_iter_item_fut = merge_iter.next();
|
||||
let (all_keys_iter_item, merge_iter_item) = match concurrency {
|
||||
ValidationIoConcurrency::Sequential => {
|
||||
(all_keys_iter_item_fut.await, merge_iter_item_fut.await)
|
||||
},
|
||||
ValidationIoConcurrency::Concurrent => {
|
||||
futures::future::join(all_keys_iter_item_fut, merge_iter_item_fut).await
|
||||
},
|
||||
};
|
||||
// compare results & log warnings as needed
|
||||
macro_rules! rate_limited_warn {
|
||||
($($arg:tt)*) => {{
|
||||
@@ -928,16 +945,16 @@ impl Timeline {
|
||||
rate_limited_warn!(?merge, "all_keys_iter returned None where merge returned Some");
|
||||
}
|
||||
(Ok(Some((all_keys_key, all_keys_lsn, all_keys_value))), Ok(Some((merge_key, merge_lsn, merge_value)))) => {
|
||||
match mode {
|
||||
match what {
|
||||
// TODO: in this mode, we still load the value from disk for both iterators, even though we only need the all_keys_iter one
|
||||
CompactL0BypassPageCacheValidation::KeyLsn => {
|
||||
ValidationWhat::KeyLsn => {
|
||||
let all_keys = (all_keys_key, all_keys_lsn);
|
||||
let merge = (merge_key, merge_lsn);
|
||||
if all_keys != merge {
|
||||
rate_limited_warn!(?all_keys, ?merge, "merge returned a different (Key,LSN) than all_keys_iter");
|
||||
}
|
||||
}
|
||||
CompactL0BypassPageCacheValidation::KeyLsnValue => {
|
||||
ValidationWhat::KeyLsnValue => {
|
||||
let all_keys = (all_keys_key, all_keys_lsn, all_keys_value);
|
||||
let merge = (merge_key, merge_lsn, merge_value);
|
||||
if all_keys != merge {
|
||||
@@ -969,7 +986,26 @@ impl Timeline {
|
||||
match validate {
|
||||
None => AllValuesIter::StreamingKmergeBypassingPageCache { merge_iter },
|
||||
Some(validate) => AllValuesIter::ValidatingStreamingKmergeBypassingPageCache {
|
||||
mode: validate.clone(),
|
||||
what: match &validate {
|
||||
CompactL0BypassPageCacheValidation::KeyLsn
|
||||
| CompactL0BypassPageCacheValidation::KeyLsnConcurrentIo => {
|
||||
ValidationWhat::KeyLsn
|
||||
}
|
||||
CompactL0BypassPageCacheValidation::KeyLsnValue
|
||||
| CompactL0BypassPageCacheValidation::KeyLsnValueConcurrentIo => {
|
||||
ValidationWhat::KeyLsnValue
|
||||
}
|
||||
},
|
||||
concurrency: match validate {
|
||||
CompactL0BypassPageCacheValidation::KeyLsnConcurrentIo
|
||||
| CompactL0BypassPageCacheValidation::KeyLsnValueConcurrentIo => {
|
||||
ValidationIoConcurrency::Concurrent
|
||||
}
|
||||
CompactL0BypassPageCacheValidation::KeyLsn
|
||||
| CompactL0BypassPageCacheValidation::KeyLsnValue => {
|
||||
ValidationIoConcurrency::Sequential
|
||||
}
|
||||
},
|
||||
merge_iter,
|
||||
all_keys_iter: all_keys.iter(),
|
||||
},
|
||||
@@ -1389,11 +1425,16 @@ pub enum CompactL0Phase1ValueAccess {
|
||||
/// See [`CompactL0Phase1ValueAccess::StreamingKmerge`].
|
||||
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
#[allow(clippy::enum_variant_names)]
|
||||
pub enum CompactL0BypassPageCacheValidation {
|
||||
/// Validate that the series of (key, lsn) pairs are the same.
|
||||
KeyLsn,
|
||||
// Like [`KeyLsn`], but perform the IO concurrently.
|
||||
KeyLsnConcurrentIo,
|
||||
/// Validate that the entire output of old and new way is identical.
|
||||
KeyLsnValue,
|
||||
// Like [`KeyLsnValue`], but perform the IO concurrently.
|
||||
KeyLsnValueConcurrentIo,
|
||||
}
|
||||
|
||||
impl Default for CompactL0Phase1ValueAccess {
|
||||
|
||||
Reference in New Issue
Block a user