Compare commits

...

6 Commits

Author SHA1 Message Date
Christian Schwarz
7c691fc87f remove "next" info_span!, it showed up in flamegraphs (not too meaningful though) 2024-08-16 14:40:33 +00:00
Christian Schwarz
fb2d0131e2 add JustReadBoth validation mode 2024-08-16 13:34:01 +00:00
Christian Schwarz
4205300105 revert preceding two WIP parallel mode efforts
Not worth the trouble.
2024-08-16 12:26:30 +02:00
Christian Schwarz
edd06c96fa more WIP to try to get parallel mode 2024-08-16 12:26:30 +02:00
Christian Schwarz
58b857dff2 WIP: parallel mode 2024-08-16 12:26:30 +02:00
Christian Schwarz
26206e8d8a option for concurrent IO 2024-08-16 12:26:30 +02:00

View File

@@ -855,11 +855,21 @@ impl Timeline {
merge_iter: MergeIterator<'a>,
},
ValidatingStreamingKmergeBypassingPageCache {
mode: CompactL0BypassPageCacheValidation,
what: ValidationWhat,
concurrency: ValidationIoConcurrency,
merge_iter: MergeIterator<'a>,
all_keys_iter: VecIter<'a>,
},
}
enum ValidationIoConcurrency {
Sequential,
Concurrent,
}
enum ValidationWhat {
Nothing,
KeyLsn,
KeyLsnValue,
}
type VecIter<'a> = std::slice::Iter<'a, DeltaEntry<'a>>; // TODO: distinguished lifetimes
impl AllValuesIter<'_> {
async fn next_all_keys_iter(
@@ -887,10 +897,18 @@ impl Timeline {
Self::next_all_keys_iter(iter, ctx).await
}
AllValuesIter::StreamingKmergeBypassingPageCache { merge_iter } => merge_iter.next().await,
AllValuesIter::ValidatingStreamingKmergeBypassingPageCache { mode, merge_iter, all_keys_iter } => async {
// advance both iterators
let all_keys_iter_item = Self::next_all_keys_iter(all_keys_iter, ctx).await;
let merge_iter_item = merge_iter.next().await;
AllValuesIter::ValidatingStreamingKmergeBypassingPageCache { what, concurrency, merge_iter, all_keys_iter } => async {
// advance both iterators. Use concurrency but no parallelism.
let all_keys_iter_item_fut = Self::next_all_keys_iter(all_keys_iter, ctx);
let merge_iter_item_fut = merge_iter.next();
let (all_keys_iter_item, merge_iter_item) = match concurrency {
ValidationIoConcurrency::Sequential => {
(all_keys_iter_item_fut.await, merge_iter_item_fut.await)
},
ValidationIoConcurrency::Concurrent => {
futures::future::join(all_keys_iter_item_fut, merge_iter_item_fut).await
},
};
// compare results & log warnings as needed
macro_rules! rate_limited_warn {
($($arg:tt)*) => {{
@@ -928,16 +946,17 @@ impl Timeline {
rate_limited_warn!(?merge, "all_keys_iter returned None where merge returned Some");
}
(Ok(Some((all_keys_key, all_keys_lsn, all_keys_value))), Ok(Some((merge_key, merge_lsn, merge_value)))) => {
match mode {
match what {
ValidationWhat::Nothing => { }
// TODO: in this mode, we still load the value from disk for both iterators, even though we only need the all_keys_iter one
CompactL0BypassPageCacheValidation::KeyLsn => {
ValidationWhat::KeyLsn => {
let all_keys = (all_keys_key, all_keys_lsn);
let merge = (merge_key, merge_lsn);
if all_keys != merge {
rate_limited_warn!(?all_keys, ?merge, "merge returned a different (Key,LSN) than all_keys_iter");
}
}
CompactL0BypassPageCacheValidation::KeyLsnValue => {
ValidationWhat::KeyLsnValue => {
let all_keys = (all_keys_key, all_keys_lsn, all_keys_value);
let merge = (merge_key, merge_lsn, merge_value);
if all_keys != merge {
@@ -949,7 +968,7 @@ impl Timeline {
}
// in case of mismatch, trust the legacy all_keys_iter_item
all_keys_iter_item
}.instrument(info_span!("next")).await
}.await
}
}
}
@@ -969,7 +988,32 @@ impl Timeline {
match validate {
None => AllValuesIter::StreamingKmergeBypassingPageCache { merge_iter },
Some(validate) => AllValuesIter::ValidatingStreamingKmergeBypassingPageCache {
mode: validate.clone(),
what: match &validate {
CompactL0BypassPageCacheValidation::JustReadBoth
| CompactL0BypassPageCacheValidation::JustReadBothConcurrentIo => {
ValidationWhat::Nothing
}
CompactL0BypassPageCacheValidation::KeyLsn
| CompactL0BypassPageCacheValidation::KeyLsnConcurrentIo => {
ValidationWhat::KeyLsn
}
CompactL0BypassPageCacheValidation::KeyLsnValue
| CompactL0BypassPageCacheValidation::KeyLsnValueConcurrentIo => {
ValidationWhat::KeyLsnValue
}
},
concurrency: match validate {
CompactL0BypassPageCacheValidation::JustReadBothConcurrentIo
| CompactL0BypassPageCacheValidation::KeyLsnConcurrentIo
| CompactL0BypassPageCacheValidation::KeyLsnValueConcurrentIo => {
ValidationIoConcurrency::Concurrent
}
CompactL0BypassPageCacheValidation::JustReadBoth
| CompactL0BypassPageCacheValidation::KeyLsn
| CompactL0BypassPageCacheValidation::KeyLsnValue => {
ValidationIoConcurrency::Sequential
}
},
merge_iter,
all_keys_iter: all_keys.iter(),
},
@@ -1389,11 +1433,18 @@ pub enum CompactL0Phase1ValueAccess {
/// See [`CompactL0Phase1ValueAccess::StreamingKmerge`].
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "kebab-case")]
#[allow(clippy::enum_variant_names)]
pub enum CompactL0BypassPageCacheValidation {
JustReadBoth,
JustReadBothConcurrentIo,
/// Validate that the series of (key, lsn) pairs are the same.
KeyLsn,
// Like [`KeyLsn`], but perform the IO concurrently.
KeyLsnConcurrentIo,
/// Validate that the entire output of old and new way is identical.
KeyLsnValue,
// Like [`KeyLsnValue`], but perform the IO concurrently.
KeyLsnValueConcurrentIo,
}
impl Default for CompactL0Phase1ValueAccess {