WIP: parallel mode

This commit is contained in:
Christian Schwarz
2024-08-16 11:56:09 +02:00
parent 26206e8d8a
commit 58b857dff2

View File

@@ -864,6 +864,7 @@ impl Timeline {
enum ValidationIoConcurrency {
Sequential,
Concurrent,
Parallel,
}
enum ValidationWhat {
KeyLsn,
@@ -898,8 +899,8 @@ impl Timeline {
AllValuesIter::StreamingKmergeBypassingPageCache { merge_iter } => merge_iter.next().await,
AllValuesIter::ValidatingStreamingKmergeBypassingPageCache { what, concurrency, merge_iter, all_keys_iter } => async {
// advance both iterators. Use concurrency but no parallelism.
let all_keys_iter_item_fut = Self::next_all_keys_iter(all_keys_iter, ctx);
let merge_iter_item_fut = merge_iter.next();
let mut all_keys_iter_item_fut = Self::next_all_keys_iter(all_keys_iter, ctx);
let mut merge_iter_item_fut = merge_iter.next();
let (all_keys_iter_item, merge_iter_item) = match concurrency {
ValidationIoConcurrency::Sequential => {
(all_keys_iter_item_fut.await, merge_iter_item_fut.await)
@@ -907,6 +908,31 @@ impl Timeline {
ValidationIoConcurrency::Concurrent => {
futures::future::join(all_keys_iter_item_fut, merge_iter_item_fut).await
},
ValidationIoConcurrency::Parallel => {
use std::task::Poll::*;
let all_keys_iter_item_fut = std::pin::pin!(all_keys_iter_item_fut);
let merge_iter_item_fut = std::pin::pin!(merge_iter_item_fut);
match (futures::poll!(&mut all_keys_iter_item_fut), futures::poll!(&mut merge_iter_item_fut)) {
(Ready(a),Ready(b)) => (a, b),
(Ready(a),Pending) => {
let b = merge_iter_item_fut.await;
(a, b)
},
(Pending,Ready(b)) => {
let a = all_keys_iter_item_fut.await;
(a, b)
}
(Pending,Pending) => {
let a = tokio::spawn(all_keys_iter_item_fut);
let b = tokio::spawn(merge_iter_item_fut);
let (a, b) = futures::future::join(a, b).await;
(
a.map_err(anyhow::Error::new).and_then(|r| r),
b.map_err(anyhow::Error::new).and_then(|r| r),
)
},
}
},
};
// compare results & log warnings as needed
macro_rules! rate_limited_warn {