refactor: wrap the oneshot's into a properly named abstraction (OnDiskValueIo, etc)

This commit is contained in:
Christian Schwarz
2025-01-21 17:09:39 +01:00
parent 7ca9112ec1
commit 93625344eb
4 changed files with 107 additions and 73 deletions

View File

@@ -98,14 +98,65 @@ pub(crate) enum OnDiskValue {
/// Reconstruct data accumulated for a single key during a vectored get
#[derive(Debug, Default)]
pub(crate) struct VectoredValueReconstructState {
pub(crate) on_disk_values: Vec<(
Lsn,
tokio::sync::oneshot::Receiver<Result<OnDiskValue, std::io::Error>>,
)>,
pub(crate) on_disk_values: Vec<(Lsn, OnDiskValueIoWaiter)>,
pub(crate) situation: ValueReconstructSituation,
}
#[derive(Debug)]
pub(crate) struct OnDiskValueIoWaiter {
rx: tokio::sync::oneshot::Receiver<OnDiskValueIoResult>,
}
#[derive(Debug)]
#[must_use]
pub(crate) enum OnDiskValueIo {
/// Traversal identified this IO as required to complete the vectored get.
Required {
tx: tokio::sync::oneshot::Sender<OnDiskValueIoResult>,
},
/// Sparse keyspace reads always read all the values for a given key,
/// even though only the first value is needed.
///
/// This variant represents the unnecessary IOs for those values at lower LSNs
/// that aren't needed, but are currently still being done.
///
/// The execution of unnecessary IOs was a pre-existing behavior before concurrent IO.
/// We added this explicit representation here so that we can drop
/// unnecessary IO results immediately, instead of buffering them in
/// `oneshot` channels inside [`VectoredValueReconstructState`] until
/// [`VectoredValueReconstructState::collect_pending_ios`] gets called.
Unnecessary,
}
type OnDiskValueIoResult = Result<OnDiskValue, std::io::Error>;
impl OnDiskValueIo {
pub(crate) fn complete(self, res: OnDiskValueIoResult) {
match self {
OnDiskValueIo::Required { tx } => {
let _ = tx.send(res);
}
OnDiskValueIo::Unnecessary => {
// Nobody cared, see variant doc comment.
}
}
}
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum WaitCompletionError {
#[error("OnDiskValueIo was dropped without completing, likely the sidecar task panicked")]
IoDropped,
}
impl OnDiskValueIoWaiter {
pub(crate) async fn wait_completion(self) -> Result<OnDiskValueIoResult, WaitCompletionError> {
// NB: for Unnecessary IOs, this method never gets called because we don't add them to `on_disk_values`.
self.rx.await.map_err(|_| WaitCompletionError::IoDropped)
}
}
impl VectoredValueReconstructState {
/// # Cancel-Safety
///
@@ -129,8 +180,9 @@ impl VectoredValueReconstructState {
// Revisit this when IO futures are replaced with a more sophisticated IO system
// and an IO scheduler, where we know which IOs were submitted and which ones
// just queued. Cf the comment on IoConcurrency::spawn_io.
for (lsn, fut) in self.on_disk_values {
let value_recv_res = fut
for (lsn, waiter) in self.on_disk_values {
let value_recv_res = waiter
.wait_completion()
// we rely on the caller to poll us to completion, so this is not a bail point
.await;
// Force not bailing early by wrapping the code into a closure.
@@ -140,10 +192,9 @@ impl VectoredValueReconstructState {
(Err(_), _) => {
// We've already failed, no need to process more.
}
(Ok(_), Err(recv_error)) => {
(Ok(_), Err(wait_err)) => {
// This shouldn't happen - likely the sidecar task panicked.
let recv_error: tokio::sync::oneshot::error::RecvError = recv_error;
res = Err(PageReconstructError::Other(recv_error.into()));
res = Err(PageReconstructError::Other(wait_err.into()));
}
(Ok(_), Ok(Err(err))) => {
let err: std::io::Error = err;
@@ -587,31 +638,28 @@ impl ValuesReconstructState {
///
/// If the key is in the sparse keyspace (i.e., aux files), we do not track them in
/// `key_done`.
pub(crate) fn update_key(
&mut self,
key: &Key,
lsn: Lsn,
completes: bool,
value: tokio::sync::oneshot::Receiver<Result<OnDiskValue, std::io::Error>>,
) -> ValueReconstructSituation {
// TODO: rename this method & update description.
pub(crate) fn update_key(&mut self, key: &Key, lsn: Lsn, completes: bool) -> OnDiskValueIo {
let state = self.keys.entry(*key).or_default();
let is_sparse_key = key.is_sparse();
match state.situation {
let required_io = match state.situation {
ValueReconstructSituation::Complete => {
if is_sparse_key {
// Sparse keyspace might be visited multiple times because
// we don't track unmapped keyspaces.
return ValueReconstructSituation::Complete;
return OnDiskValueIo::Unnecessary;
} else {
unreachable!()
}
}
ValueReconstructSituation::Continue => {
state.on_disk_values.push((lsn, value));
let (tx, rx) = tokio::sync::oneshot::channel();
state.on_disk_values.push((lsn, OnDiskValueIoWaiter { rx }));
OnDiskValueIo::Required { tx }
}
}
};
if completes && state.situation == ValueReconstructSituation::Continue {
state.situation = ValueReconstructSituation::Complete;
@@ -620,7 +668,7 @@ impl ValuesReconstructState {
}
}
state.situation
required_io
}
/// Returns the key space describing the keys that have

View File

@@ -66,7 +66,7 @@ use std::ops::Range;
use std::os::unix::fs::FileExt;
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::{self, OnceCell};
use tokio::sync::OnceCell;
use tokio_epoll_uring::IoBuf;
use tracing::*;
@@ -77,7 +77,8 @@ use utils::{
};
use super::{
AsLayerDesc, LayerName, OnDiskValue, PersistentLayerDesc, ResidentLayer, ValuesReconstructState,
AsLayerDesc, LayerName, OnDiskValue, OnDiskValueIo, PersistentLayerDesc, ResidentLayer,
ValuesReconstructState,
};
///
@@ -1009,19 +1010,14 @@ impl DeltaLayerInner {
// This is the order that `ReconstructState` requires such that it can
// track when a key is done.
for read in reads.into_iter().rev() {
let mut senders: HashMap<
(Key, Lsn),
sync::oneshot::Sender<Result<OnDiskValue, std::io::Error>>,
> = Default::default();
let mut ios: HashMap<(Key, Lsn), OnDiskValueIo> = Default::default();
for (_, blob_meta) in read.blobs_at.as_slice().iter().rev() {
let (tx, rx) = sync::oneshot::channel();
senders.insert((blob_meta.key, blob_meta.lsn), tx);
reconstruct_state.update_key(
let io = reconstruct_state.update_key(
&blob_meta.key,
blob_meta.lsn,
blob_meta.will_init,
rx,
);
ios.insert((blob_meta.key, blob_meta.lsn), io);
}
let read_extend_residency = this.clone();
@@ -1037,8 +1033,7 @@ impl DeltaLayerInner {
Ok(blobs_buf) => {
let view = BufView::new_slice(&blobs_buf.buf);
for meta in blobs_buf.blobs.iter().rev() {
let sender =
senders.remove(&(meta.meta.key, meta.meta.lsn)).unwrap();
let io = ios.remove(&(meta.meta.key, meta.meta.lsn)).unwrap();
if Some(meta.meta.key) == ignore_key_with_err {
continue;
@@ -1050,22 +1045,24 @@ impl DeltaLayerInner {
Err(e) => {
ignore_key_with_err = Some(meta.meta.key);
let _ = sender.send(Err(e));
io.complete(Err(e));
continue;
}
};
let _ = sender.send(Ok(OnDiskValue::WalRecordOrImage(
io.complete(Ok(OnDiskValue::WalRecordOrImage(
blob_read.into_bytes(),
)));
}
assert!(senders.is_empty());
assert!(ios.is_empty());
}
Err(err) => {
for (_, sender) in senders {
let _ = sender
.send(Err(std::io::Error::new(err.kind(), "vec read failed")));
for (_, sender) in ios {
sender.complete(Err(std::io::Error::new(
err.kind(),
"vec read failed",
)));
}
}
}

View File

@@ -62,7 +62,6 @@ use std::ops::Range;
use std::os::unix::prelude::FileExt;
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::oneshot;
use tokio::sync::OnceCell;
use tokio_stream::StreamExt;
use tracing::*;
@@ -75,7 +74,8 @@ use utils::{
use super::layer_name::ImageLayerName;
use super::{
AsLayerDesc, LayerName, OnDiskValue, PersistentLayerDesc, ResidentLayer, ValuesReconstructState,
AsLayerDesc, LayerName, OnDiskValue, OnDiskValueIo, PersistentLayerDesc, ResidentLayer,
ValuesReconstructState,
};
///
@@ -588,15 +588,10 @@ impl ImageLayerInner {
.into();
for read in reads.into_iter() {
let mut senders: HashMap<
(Key, Lsn),
oneshot::Sender<Result<OnDiskValue, std::io::Error>>,
> = Default::default();
let mut ios: HashMap<(Key, Lsn), OnDiskValueIo> = Default::default();
for (_, blob_meta) in read.blobs_at.as_slice() {
let (tx, rx) = oneshot::channel();
senders.insert((blob_meta.key, blob_meta.lsn), tx);
reconstruct_state.update_key(&blob_meta.key, blob_meta.lsn, true, rx);
let io = reconstruct_state.update_key(&blob_meta.key, blob_meta.lsn, true);
ios.insert((blob_meta.key, blob_meta.lsn), io);
}
let buf_size = read.size();
@@ -641,28 +636,29 @@ impl ImageLayerInner {
Ok(blobs_buf) => {
let view = BufView::new_slice(&blobs_buf.buf);
for meta in blobs_buf.blobs.iter() {
let sender =
senders.remove(&(meta.meta.key, meta.meta.lsn)).unwrap();
let io: OnDiskValueIo =
ios.remove(&(meta.meta.key, meta.meta.lsn)).unwrap();
let img_buf = meta.read(&view).await;
let img_buf = match img_buf {
Ok(img_buf) => img_buf,
Err(e) => {
let _ = sender.send(Err(e));
io.complete(Err(e));
continue;
}
};
let _ =
sender.send(Ok(OnDiskValue::RawImage(img_buf.into_bytes())));
io.complete(Ok(OnDiskValue::RawImage(img_buf.into_bytes())));
}
assert!(senders.is_empty());
assert!(ios.is_empty());
}
Err(err) => {
for (_, sender) in senders {
let _ = sender
.send(Err(std::io::Error::new(err.kind(), "vec read failed")));
for (_, io) in ios {
io.complete(Err(std::io::Error::new(
err.kind(),
"vec read failed",
)));
}
}
}

View File

@@ -8,7 +8,7 @@ use crate::assert_u64_eq_usize::{u64_to_usize, U64IsUsize, UsizeIsU64};
use crate::config::PageServerConf;
use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
use crate::tenant::ephemeral_file::EphemeralFile;
use crate::tenant::storage_layer::OnDiskValue;
use crate::tenant::storage_layer::{OnDiskValue, OnDiskValueIo};
use crate::tenant::timeline::GetVectoredError;
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
use crate::{l0_flush, page_cache};
@@ -430,10 +430,7 @@ impl InMemoryLayer {
read: vectored_dio_read::LogicalRead<Vec<u8>>,
}
let mut reads: HashMap<Key, Vec<ValueRead>> = HashMap::new();
let mut senders: HashMap<
(Key, Lsn),
tokio::sync::oneshot::Sender<Result<OnDiskValue, std::io::Error>>,
> = Default::default();
let mut ios: HashMap<(Key, Lsn), OnDiskValueIo> = Default::default();
let lsn_range = self.start_lsn..end_lsn;
@@ -460,9 +457,8 @@ impl InMemoryLayer {
),
});
let (tx, rx) = tokio::sync::oneshot::channel();
senders.insert((key, *entry_lsn), tx);
reconstruct_state.update_key(&key, *entry_lsn, will_init, rx);
let io = reconstruct_state.update_key(&key, *entry_lsn, will_init);
ios.insert((key, *entry_lsn), io);
if will_init {
break;
@@ -488,25 +484,22 @@ impl InMemoryLayer {
for (key, value_reads) in reads {
for ValueRead { entry_lsn, read } in value_reads {
let sender = senders
.remove(&(key, entry_lsn))
.expect("sender must exist");
let io = ios.remove(&(key, entry_lsn)).expect("sender must exist");
match read.into_result().expect("we run execute() above") {
Err(e) => {
let _ = sender.send(Err(std::io::Error::new(
io.complete(Err(std::io::Error::new(
e.kind(),
"dio vec read failed",
)));
}
Ok(value_buf) => {
let _ = sender
.send(Ok(OnDiskValue::WalRecordOrImage(value_buf.into())));
io.complete(Ok(OnDiskValue::WalRecordOrImage(value_buf.into())));
}
}
}
}
assert!(senders.is_empty());
assert!(ios.is_empty());
// Keep layer existent until this IO is done;
// This is kinda forced for InMemoryLayer because we need to inner.read() anyway,