From 71b6aa2ab74dc42c48798c6a01f159c9fa16c367 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 12 Dec 2024 13:55:35 +0100 Subject: [PATCH] implement futuresunordered mode --- pageserver/src/tenant/storage_layer.rs | 15 +++++++++++++-- pageserver/src/tenant/timeline.rs | 8 ++++++++ .../pageserver/test_page_service_batching.py | 2 +- 3 files changed, 22 insertions(+), 3 deletions(-) diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 2511498b16..c522e88b89 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -19,7 +19,9 @@ use pageserver_api::value::Value; use std::cmp::Ordering; use std::collections::hash_map::Entry; use std::collections::{BinaryHeap, HashMap}; +use std::future::Future; use std::ops::Range; +use std::pin::Pin; use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; @@ -153,7 +155,7 @@ pub(crate) struct ValuesReconstructState { layers_visited: u32, delta_layers_visited: u32, - io_concurrency: IoConcurrency, + pub(crate) io_concurrency: IoConcurrency, } /// The level of IO concurrency to be used on the read path @@ -161,9 +163,12 @@ pub(crate) struct ValuesReconstructState { /// The desired end state is that we always do parallel IO. /// This struct and the dispatching in the impl will be removed once /// we've built enough confidence. -enum IoConcurrency { +pub(crate) enum IoConcurrency { Serial, Parallel, + FuturesUnordered { + futures: futures::stream::FuturesUnordered>>>, + }, } impl IoConcurrency { @@ -176,6 +181,9 @@ impl IoConcurrency { IoConcurrency::Parallel => { tokio::spawn(fut); } + IoConcurrency::FuturesUnordered { futures } => { + futures.push(Box::pin(fut)); + } } } } @@ -204,6 +212,9 @@ impl ValuesReconstructState { match IO_CONCURRENCY.as_str() { "parallel" => IoConcurrency::Parallel, "serial" => IoConcurrency::Serial, + "futures-unordered" => IoConcurrency::FuturesUnordered { + futures: futures::stream::FuturesUnordered::new(), + }, x => panic!( "Invalid value for NEON_PAGESERVER_VALUE_RECONSTRUCT_IO_CONCURRENCY: {}", x diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index fe9105f33b..cd7e4d250d 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -3419,6 +3419,14 @@ impl Timeline { } } + match reconstruct_state.io_concurrency { + super::storage_layer::IoConcurrency::Serial => (), + super::storage_layer::IoConcurrency::Parallel => (), + super::storage_layer::IoConcurrency::FuturesUnordered { ref mut futures } => { + while let Some(()) = futures.next().await {} + } + } + Ok(TimelineVisitOutcome { completed_keyspace, image_covered_keyspace: image_covered_keyspace.consume_keyspace(), diff --git a/test_runner/performance/pageserver/test_page_service_batching.py b/test_runner/performance/pageserver/test_page_service_batching.py index eaa592d370..a2a4015cbf 100644 --- a/test_runner/performance/pageserver/test_page_service_batching.py +++ b/test_runner/performance/pageserver/test_page_service_batching.py @@ -32,7 +32,7 @@ class PageServicePipeliningConfigPipelined(PageServicePipeliningConfig): PS_DIRECT_IO = ["direct"] -PS_IO_CONCURRENCY = ["serial", "parallel"] +PS_IO_CONCURRENCY = ["serial", "parallel", "futures-unordered"] EXECUTION = ["concurrent-futures"] NON_BATCHABLE: list[PageServicePipeliningConfig] = [PageServicePipeliningConfigSerial()]