mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 18:02:56 +00:00
implement futuresunordered mode
This commit is contained in:
@@ -19,7 +19,9 @@ use pageserver_api::value::Value;
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::{BinaryHeap, HashMap};
|
||||
use std::future::Future;
|
||||
use std::ops::Range;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
|
||||
@@ -153,7 +155,7 @@ pub(crate) struct ValuesReconstructState {
|
||||
layers_visited: u32,
|
||||
delta_layers_visited: u32,
|
||||
|
||||
io_concurrency: IoConcurrency,
|
||||
pub(crate) io_concurrency: IoConcurrency,
|
||||
}
|
||||
|
||||
/// The level of IO concurrency to be used on the read path
|
||||
@@ -161,9 +163,12 @@ pub(crate) struct ValuesReconstructState {
|
||||
/// The desired end state is that we always do parallel IO.
|
||||
/// This struct and the dispatching in the impl will be removed once
|
||||
/// we've built enough confidence.
|
||||
enum IoConcurrency {
|
||||
pub(crate) enum IoConcurrency {
|
||||
Serial,
|
||||
Parallel,
|
||||
FuturesUnordered {
|
||||
futures: futures::stream::FuturesUnordered<Pin<Box<dyn Send + Future<Output = ()>>>>,
|
||||
},
|
||||
}
|
||||
|
||||
impl IoConcurrency {
|
||||
@@ -176,6 +181,9 @@ impl IoConcurrency {
|
||||
IoConcurrency::Parallel => {
|
||||
tokio::spawn(fut);
|
||||
}
|
||||
IoConcurrency::FuturesUnordered { futures } => {
|
||||
futures.push(Box::pin(fut));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -204,6 +212,9 @@ impl ValuesReconstructState {
|
||||
match IO_CONCURRENCY.as_str() {
|
||||
"parallel" => IoConcurrency::Parallel,
|
||||
"serial" => IoConcurrency::Serial,
|
||||
"futures-unordered" => IoConcurrency::FuturesUnordered {
|
||||
futures: futures::stream::FuturesUnordered::new(),
|
||||
},
|
||||
x => panic!(
|
||||
"Invalid value for NEON_PAGESERVER_VALUE_RECONSTRUCT_IO_CONCURRENCY: {}",
|
||||
x
|
||||
|
||||
@@ -3419,6 +3419,14 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
match reconstruct_state.io_concurrency {
|
||||
super::storage_layer::IoConcurrency::Serial => (),
|
||||
super::storage_layer::IoConcurrency::Parallel => (),
|
||||
super::storage_layer::IoConcurrency::FuturesUnordered { ref mut futures } => {
|
||||
while let Some(()) = futures.next().await {}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(TimelineVisitOutcome {
|
||||
completed_keyspace,
|
||||
image_covered_keyspace: image_covered_keyspace.consume_keyspace(),
|
||||
|
||||
@@ -32,7 +32,7 @@ class PageServicePipeliningConfigPipelined(PageServicePipeliningConfig):
|
||||
|
||||
|
||||
PS_DIRECT_IO = ["direct"]
|
||||
PS_IO_CONCURRENCY = ["serial", "parallel"]
|
||||
PS_IO_CONCURRENCY = ["serial", "parallel", "futures-unordered"]
|
||||
EXECUTION = ["concurrent-futures"]
|
||||
|
||||
NON_BATCHABLE: list[PageServicePipeliningConfig] = [PageServicePipeliningConfigSerial()]
|
||||
|
||||
Reference in New Issue
Block a user