diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 1e920f1d2a..592e70215d 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -98,14 +98,65 @@ pub(crate) enum OnDiskValue { /// Reconstruct data accumulated for a single key during a vectored get #[derive(Debug, Default)] pub(crate) struct VectoredValueReconstructState { - pub(crate) on_disk_values: Vec<( - Lsn, - tokio::sync::oneshot::Receiver>, - )>, + pub(crate) on_disk_values: Vec<(Lsn, OnDiskValueIoWaiter)>, pub(crate) situation: ValueReconstructSituation, } +#[derive(Debug)] +pub(crate) struct OnDiskValueIoWaiter { + rx: tokio::sync::oneshot::Receiver, +} + +#[derive(Debug)] +#[must_use] +pub(crate) enum OnDiskValueIo { + /// Traversal identified this IO as required to complete the vectored get. + Required { + tx: tokio::sync::oneshot::Sender, + }, + /// Sparse keyspace reads always read all the values for a given key, + /// even though only the first value is needed. + /// + /// This variant represents the unnecessary IOs for those values at lower LSNs + /// that aren't needed, but are currently still being done. + /// + /// The execution of unnecessary IOs was a pre-existing behavior before concurrent IO. + /// We added this explicit representation here so that we can drop + /// unnecessary IO results immediately, instead of buffering them in + /// `oneshot` channels inside [`VectoredValueReconstructState`] until + /// [`VectoredValueReconstructState::collect_pending_ios`] gets called. + Unnecessary, +} + +type OnDiskValueIoResult = Result; + +impl OnDiskValueIo { + pub(crate) fn complete(self, res: OnDiskValueIoResult) { + match self { + OnDiskValueIo::Required { tx } => { + let _ = tx.send(res); + } + OnDiskValueIo::Unnecessary => { + // Nobody cared, see variant doc comment. + } + } + } +} + +#[derive(Debug, thiserror::Error)] +pub(crate) enum WaitCompletionError { + #[error("OnDiskValueIo was dropped without completing, likely the sidecar task panicked")] + IoDropped, +} + +impl OnDiskValueIoWaiter { + pub(crate) async fn wait_completion(self) -> Result { + // NB: for Unnecessary IOs, this method never gets called because we don't add them to `on_disk_values`. + self.rx.await.map_err(|_| WaitCompletionError::IoDropped) + } +} + impl VectoredValueReconstructState { /// # Cancel-Safety /// @@ -129,8 +180,9 @@ impl VectoredValueReconstructState { // Revisit this when IO futures are replaced with a more sophisticated IO system // and an IO scheduler, where we know which IOs were submitted and which ones // just queued. Cf the comment on IoConcurrency::spawn_io. - for (lsn, fut) in self.on_disk_values { - let value_recv_res = fut + for (lsn, waiter) in self.on_disk_values { + let value_recv_res = waiter + .wait_completion() // we rely on the caller to poll us to completion, so this is not a bail point .await; // Force not bailing early by wrapping the code into a closure. @@ -140,10 +192,9 @@ impl VectoredValueReconstructState { (Err(_), _) => { // We've already failed, no need to process more. } - (Ok(_), Err(recv_error)) => { + (Ok(_), Err(wait_err)) => { // This shouldn't happen - likely the sidecar task panicked. - let recv_error: tokio::sync::oneshot::error::RecvError = recv_error; - res = Err(PageReconstructError::Other(recv_error.into())); + res = Err(PageReconstructError::Other(wait_err.into())); } (Ok(_), Ok(Err(err))) => { let err: std::io::Error = err; @@ -587,31 +638,28 @@ impl ValuesReconstructState { /// /// If the key is in the sparse keyspace (i.e., aux files), we do not track them in /// `key_done`. - pub(crate) fn update_key( - &mut self, - key: &Key, - lsn: Lsn, - completes: bool, - value: tokio::sync::oneshot::Receiver>, - ) -> ValueReconstructSituation { + // TODO: rename this method & update description. + pub(crate) fn update_key(&mut self, key: &Key, lsn: Lsn, completes: bool) -> OnDiskValueIo { let state = self.keys.entry(*key).or_default(); let is_sparse_key = key.is_sparse(); - match state.situation { + let required_io = match state.situation { ValueReconstructSituation::Complete => { if is_sparse_key { // Sparse keyspace might be visited multiple times because // we don't track unmapped keyspaces. - return ValueReconstructSituation::Complete; + return OnDiskValueIo::Unnecessary; } else { unreachable!() } } ValueReconstructSituation::Continue => { - state.on_disk_values.push((lsn, value)); + let (tx, rx) = tokio::sync::oneshot::channel(); + state.on_disk_values.push((lsn, OnDiskValueIoWaiter { rx })); + OnDiskValueIo::Required { tx } } - } + }; if completes && state.situation == ValueReconstructSituation::Continue { state.situation = ValueReconstructSituation::Complete; @@ -620,7 +668,7 @@ impl ValuesReconstructState { } } - state.situation + required_io } /// Returns the key space describing the keys that have diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index b0ad6d31c8..e32fb582f8 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -66,7 +66,7 @@ use std::ops::Range; use std::os::unix::fs::FileExt; use std::str::FromStr; use std::sync::Arc; -use tokio::sync::{self, OnceCell}; +use tokio::sync::OnceCell; use tokio_epoll_uring::IoBuf; use tracing::*; @@ -77,7 +77,8 @@ use utils::{ }; use super::{ - AsLayerDesc, LayerName, OnDiskValue, PersistentLayerDesc, ResidentLayer, ValuesReconstructState, + AsLayerDesc, LayerName, OnDiskValue, OnDiskValueIo, PersistentLayerDesc, ResidentLayer, + ValuesReconstructState, }; /// @@ -1009,19 +1010,14 @@ impl DeltaLayerInner { // This is the order that `ReconstructState` requires such that it can // track when a key is done. for read in reads.into_iter().rev() { - let mut senders: HashMap< - (Key, Lsn), - sync::oneshot::Sender>, - > = Default::default(); + let mut ios: HashMap<(Key, Lsn), OnDiskValueIo> = Default::default(); for (_, blob_meta) in read.blobs_at.as_slice().iter().rev() { - let (tx, rx) = sync::oneshot::channel(); - senders.insert((blob_meta.key, blob_meta.lsn), tx); - reconstruct_state.update_key( + let io = reconstruct_state.update_key( &blob_meta.key, blob_meta.lsn, blob_meta.will_init, - rx, ); + ios.insert((blob_meta.key, blob_meta.lsn), io); } let read_extend_residency = this.clone(); @@ -1037,8 +1033,7 @@ impl DeltaLayerInner { Ok(blobs_buf) => { let view = BufView::new_slice(&blobs_buf.buf); for meta in blobs_buf.blobs.iter().rev() { - let sender = - senders.remove(&(meta.meta.key, meta.meta.lsn)).unwrap(); + let io = ios.remove(&(meta.meta.key, meta.meta.lsn)).unwrap(); if Some(meta.meta.key) == ignore_key_with_err { continue; @@ -1050,22 +1045,24 @@ impl DeltaLayerInner { Err(e) => { ignore_key_with_err = Some(meta.meta.key); - let _ = sender.send(Err(e)); + io.complete(Err(e)); continue; } }; - let _ = sender.send(Ok(OnDiskValue::WalRecordOrImage( + io.complete(Ok(OnDiskValue::WalRecordOrImage( blob_read.into_bytes(), ))); } - assert!(senders.is_empty()); + assert!(ios.is_empty()); } Err(err) => { - for (_, sender) in senders { - let _ = sender - .send(Err(std::io::Error::new(err.kind(), "vec read failed"))); + for (_, sender) in ios { + sender.complete(Err(std::io::Error::new( + err.kind(), + "vec read failed", + ))); } } } diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index a8abe037b8..c49281dc45 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -62,7 +62,6 @@ use std::ops::Range; use std::os::unix::prelude::FileExt; use std::str::FromStr; use std::sync::Arc; -use tokio::sync::oneshot; use tokio::sync::OnceCell; use tokio_stream::StreamExt; use tracing::*; @@ -75,7 +74,8 @@ use utils::{ use super::layer_name::ImageLayerName; use super::{ - AsLayerDesc, LayerName, OnDiskValue, PersistentLayerDesc, ResidentLayer, ValuesReconstructState, + AsLayerDesc, LayerName, OnDiskValue, OnDiskValueIo, PersistentLayerDesc, ResidentLayer, + ValuesReconstructState, }; /// @@ -588,15 +588,10 @@ impl ImageLayerInner { .into(); for read in reads.into_iter() { - let mut senders: HashMap< - (Key, Lsn), - oneshot::Sender>, - > = Default::default(); + let mut ios: HashMap<(Key, Lsn), OnDiskValueIo> = Default::default(); for (_, blob_meta) in read.blobs_at.as_slice() { - let (tx, rx) = oneshot::channel(); - senders.insert((blob_meta.key, blob_meta.lsn), tx); - - reconstruct_state.update_key(&blob_meta.key, blob_meta.lsn, true, rx); + let io = reconstruct_state.update_key(&blob_meta.key, blob_meta.lsn, true); + ios.insert((blob_meta.key, blob_meta.lsn), io); } let buf_size = read.size(); @@ -641,28 +636,29 @@ impl ImageLayerInner { Ok(blobs_buf) => { let view = BufView::new_slice(&blobs_buf.buf); for meta in blobs_buf.blobs.iter() { - let sender = - senders.remove(&(meta.meta.key, meta.meta.lsn)).unwrap(); + let io: OnDiskValueIo = + ios.remove(&(meta.meta.key, meta.meta.lsn)).unwrap(); let img_buf = meta.read(&view).await; let img_buf = match img_buf { Ok(img_buf) => img_buf, Err(e) => { - let _ = sender.send(Err(e)); + io.complete(Err(e)); continue; } }; - let _ = - sender.send(Ok(OnDiskValue::RawImage(img_buf.into_bytes()))); + io.complete(Ok(OnDiskValue::RawImage(img_buf.into_bytes()))); } - assert!(senders.is_empty()); + assert!(ios.is_empty()); } Err(err) => { - for (_, sender) in senders { - let _ = sender - .send(Err(std::io::Error::new(err.kind(), "vec read failed"))); + for (_, io) in ios { + io.complete(Err(std::io::Error::new( + err.kind(), + "vec read failed", + ))); } } } diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index d745ada6e7..61a0fdea8c 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -8,7 +8,7 @@ use crate::assert_u64_eq_usize::{u64_to_usize, U64IsUsize, UsizeIsU64}; use crate::config::PageServerConf; use crate::context::{PageContentKind, RequestContext, RequestContextBuilder}; use crate::tenant::ephemeral_file::EphemeralFile; -use crate::tenant::storage_layer::OnDiskValue; +use crate::tenant::storage_layer::{OnDiskValue, OnDiskValueIo}; use crate::tenant::timeline::GetVectoredError; use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt; use crate::{l0_flush, page_cache}; @@ -430,10 +430,7 @@ impl InMemoryLayer { read: vectored_dio_read::LogicalRead>, } let mut reads: HashMap> = HashMap::new(); - let mut senders: HashMap< - (Key, Lsn), - tokio::sync::oneshot::Sender>, - > = Default::default(); + let mut ios: HashMap<(Key, Lsn), OnDiskValueIo> = Default::default(); let lsn_range = self.start_lsn..end_lsn; @@ -460,9 +457,8 @@ impl InMemoryLayer { ), }); - let (tx, rx) = tokio::sync::oneshot::channel(); - senders.insert((key, *entry_lsn), tx); - reconstruct_state.update_key(&key, *entry_lsn, will_init, rx); + let io = reconstruct_state.update_key(&key, *entry_lsn, will_init); + ios.insert((key, *entry_lsn), io); if will_init { break; @@ -488,25 +484,22 @@ impl InMemoryLayer { for (key, value_reads) in reads { for ValueRead { entry_lsn, read } in value_reads { - let sender = senders - .remove(&(key, entry_lsn)) - .expect("sender must exist"); + let io = ios.remove(&(key, entry_lsn)).expect("sender must exist"); match read.into_result().expect("we run execute() above") { Err(e) => { - let _ = sender.send(Err(std::io::Error::new( + io.complete(Err(std::io::Error::new( e.kind(), "dio vec read failed", ))); } Ok(value_buf) => { - let _ = sender - .send(Ok(OnDiskValue::WalRecordOrImage(value_buf.into()))); + io.complete(Ok(OnDiskValue::WalRecordOrImage(value_buf.into()))); } } } } - assert!(senders.is_empty()); + assert!(ios.is_empty()); // Keep layer existent until this IO is done; // This is kinda forced for InMemoryLayer because we need to inner.read() anyway,