mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 01:12:56 +00:00
Materialize future ios
This commit is contained in:
@@ -20,7 +20,7 @@ use pageserver_api::key::Key;
|
||||
use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::{BinaryHeap, HashMap};
|
||||
use std::collections::{BTreeMap, BinaryHeap, HashMap};
|
||||
use std::ops::Range;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
@@ -92,7 +92,7 @@ pub(crate) struct VectoredValueReconstructState {
|
||||
tokio::sync::oneshot::Receiver<Result<Bytes, std::io::Error>>,
|
||||
)>,
|
||||
|
||||
situation: ValueReconstructSituation,
|
||||
pub(crate) situation: ValueReconstructSituation,
|
||||
}
|
||||
|
||||
impl VectoredValueReconstructState {
|
||||
|
||||
@@ -18,6 +18,7 @@ use camino::Utf8Path;
|
||||
use chrono::{DateTime, Utc};
|
||||
use enumset::EnumSet;
|
||||
use fail::fail_point;
|
||||
use futures::{stream::FuturesUnordered, StreamExt};
|
||||
use handle::ShardTimelineId;
|
||||
use once_cell::sync::Lazy;
|
||||
use pageserver_api::{
|
||||
@@ -69,7 +70,7 @@ use crate::{
|
||||
layer_map::{LayerMap, SearchResult},
|
||||
metadata::TimelineMetadata,
|
||||
storage_layer::{
|
||||
convert, inmemory_layer::IndexEntry, PersistentLayerDesc,
|
||||
convert, inmemory_layer::IndexEntry, PersistentLayerDesc, ValueReconstructSituation,
|
||||
},
|
||||
},
|
||||
walredo,
|
||||
@@ -1128,35 +1129,33 @@ impl Timeline {
|
||||
.await?;
|
||||
get_data_timer.stop_and_record();
|
||||
|
||||
// transform reconstruct state which is per key into a map
|
||||
// layer => all reads from that layer
|
||||
// struct KeyWaiter {
|
||||
// img: Option<oneshot::Receiver<Bytes>>,
|
||||
// values: Vec<oneshot::Receiver<Bytes>>,
|
||||
// }
|
||||
// let mut key_waiters: BTreeMap<Key, Result<KeyWaiter, PageReconstructError>> = todo!();
|
||||
|
||||
let reconstruct_timer = crate::metrics::RECONSTRUCT_TIME
|
||||
.for_get_kind(get_kind)
|
||||
.start_timer();
|
||||
let mut results: BTreeMap<Key, Result<Bytes, PageReconstructError>> = BTreeMap::new();
|
||||
let layers_visited = reconstruct_state.get_layers_visited();
|
||||
|
||||
let futs = FuturesUnordered::new();
|
||||
for (key, res) in std::mem::take(&mut reconstruct_state.keys) {
|
||||
match res {
|
||||
Err(err) => {
|
||||
results.insert(key, Err(err));
|
||||
}
|
||||
Ok(state) => {
|
||||
let state = convert(state)
|
||||
.await
|
||||
.map_err(|err| GetVectoredError::Other(err.into()))?;
|
||||
futs.push({
|
||||
let walredo_self = self.myself.upgrade().expect("&self method holds the arc");
|
||||
async move {
|
||||
let state = res.expect("Read path is infallible");
|
||||
assert!(matches!(state.situation, ValueReconstructSituation::Complete));
|
||||
|
||||
let reconstruct_res = self.reconstruct_value(key, lsn, state).await;
|
||||
results.insert(key, reconstruct_res);
|
||||
let converted = match convert(state).await {
|
||||
Ok(ok) => ok,
|
||||
Err(err) => {
|
||||
return (key, Err(err));
|
||||
}
|
||||
};
|
||||
|
||||
(key, walredo_self.reconstruct_value(key, lsn, converted).await)
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
let results = futs.collect::<BTreeMap<Key, Result<Bytes, PageReconstructError>>>().await;
|
||||
|
||||
reconstruct_timer.stop_and_record();
|
||||
|
||||
// For aux file keys (v1 or v2) the vectored read path does not return an error
|
||||
|
||||
Reference in New Issue
Block a user