diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 6ce31e07d0..1ae372c0d8 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -20,7 +20,7 @@ use pageserver_api::key::Key; use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum}; use std::cmp::Ordering; use std::collections::hash_map::Entry; -use std::collections::{BinaryHeap, HashMap}; +use std::collections::{BTreeMap, BinaryHeap, HashMap}; use std::ops::Range; use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; @@ -92,7 +92,7 @@ pub(crate) struct VectoredValueReconstructState { tokio::sync::oneshot::Receiver>, )>, - situation: ValueReconstructSituation, + pub(crate) situation: ValueReconstructSituation, } impl VectoredValueReconstructState { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index f5a380b913..85fb3f0d12 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -18,6 +18,7 @@ use camino::Utf8Path; use chrono::{DateTime, Utc}; use enumset::EnumSet; use fail::fail_point; +use futures::{stream::FuturesUnordered, StreamExt}; use handle::ShardTimelineId; use once_cell::sync::Lazy; use pageserver_api::{ @@ -69,7 +70,7 @@ use crate::{ layer_map::{LayerMap, SearchResult}, metadata::TimelineMetadata, storage_layer::{ - convert, inmemory_layer::IndexEntry, PersistentLayerDesc, + convert, inmemory_layer::IndexEntry, PersistentLayerDesc, ValueReconstructSituation, }, }, walredo, @@ -1128,35 +1129,33 @@ impl Timeline { .await?; get_data_timer.stop_and_record(); - // transform reconstruct state which is per key into a map - // layer => all reads from that layer - // struct KeyWaiter { - // img: Option>, - // values: Vec>, - // } - // let mut key_waiters: BTreeMap> = todo!(); - let reconstruct_timer = crate::metrics::RECONSTRUCT_TIME .for_get_kind(get_kind) .start_timer(); - let mut results: BTreeMap> = BTreeMap::new(); let layers_visited = reconstruct_state.get_layers_visited(); + let futs = FuturesUnordered::new(); for (key, res) in std::mem::take(&mut reconstruct_state.keys) { - match res { - Err(err) => { - results.insert(key, Err(err)); - } - Ok(state) => { - let state = convert(state) - .await - .map_err(|err| GetVectoredError::Other(err.into()))?; + futs.push({ + let walredo_self = self.myself.upgrade().expect("&self method holds the arc"); + async move { + let state = res.expect("Read path is infallible"); + assert!(matches!(state.situation, ValueReconstructSituation::Complete)); - let reconstruct_res = self.reconstruct_value(key, lsn, state).await; - results.insert(key, reconstruct_res); + let converted = match convert(state).await { + Ok(ok) => ok, + Err(err) => { + return (key, Err(err)); + } + }; + + (key, walredo_self.reconstruct_value(key, lsn, converted).await) } - } + }); } + + let results = futs.collect::>>().await; + reconstruct_timer.stop_and_record(); // For aux file keys (v1 or v2) the vectored read path does not return an error