diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 17673e2cb9..1f5673d814 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -323,15 +323,16 @@ impl ValuesReconstructState { let key_done = match state.situation { ValueReconstructSituation::Complete => unreachable!(), ValueReconstructSituation::Continue => match future_value { - FutureValue::Img { rx } => { + FutureValue::ValueImage { rx } => { assert!(state.img.is_none()); state.img = Some((lsn, rx)); true } - FutureValue::WalRecord { will_init, rx } => { + FutureValue::ValueWalRecord { will_init, rx } => { state.records.push((lsn, rx)); will_init } + FutureValue::RawImage { rx } => todo!(), }, }; @@ -367,11 +368,14 @@ impl ValuesReconstructState { } enum FutureValue { - WalRecord { + RawImage { + rx: sync::oneshot::Receiver>, + }, + ValueWalRecord { will_init: bool, rx: sync::oneshot::Receiver>, }, - Img { + ValueImage { rx: sync::oneshot::Receiver>, }, } diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 032543c4f0..8323efd692 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -39,7 +39,7 @@ use crate::tenant::disk_btree::{ use crate::tenant::storage_layer::layer::S3_UPLOAD_LIMIT; use crate::tenant::timeline::GetVectoredError; use crate::tenant::vectored_blob_io::{ - BlobFlag, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead, + BlobFlag, BlobType, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead, VectoredReadCoalesceMode, VectoredReadPlanner, }; use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt}; @@ -934,7 +934,17 @@ impl DeltaLayerInner { range_end_handled = true; break; } else { - planner.handle(key, lsn, blob_ref.pos(), flag, blob_ref.will_init()); + planner.handle( + key, + lsn, + blob_ref.pos(), + flag, + if blob_ref.will_init() { + BlobType::ValueImage + } else { + BlobType::ValueImage + }, + ); } } @@ -1002,9 +1012,12 @@ impl DeltaLayerInner { reconstruct_state.update_key( &blob_meta.key, blob_meta.lsn, - super::FutureValue::WalRecord { - will_init: blob_meta.will_init, - rx, + match blob_meta.blob_type { + BlobType::RawImage => unreachable!(), + BlobType::ValueImage => super::FutureValue::ValueImage { rx }, + BlobType::ValueWalRecord { will_init } => { + super::FutureValue::ValueWalRecord { will_init, rx } + } }, ); } @@ -1179,7 +1192,7 @@ impl DeltaLayerInner { BlobMeta { key, lsn, - will_init: false, + blob_type: todo!(), }, start_offset..end_offset, )) diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 8abd8f2bb7..5c63270eb9 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -513,7 +513,7 @@ impl ImageLayerInner { range_end_handled = true; break; } else { - planner.handle(key, self.lsn, offset, flag, true); + planner.handle(key, self.lsn, offset, flag, crate::tenant::vectored_blob_io::BlobType::RawImage); } } @@ -591,7 +591,7 @@ impl ImageLayerInner { reconstruct_state.update_key( &blob_meta.key, blob_meta.lsn, - super::FutureValue::Img { rx }, + super::FutureValue::RawImage { rx }, ); } diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index b6dc6c0ada..4d9970fb5e 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -467,11 +467,7 @@ impl InMemoryLayer { let (tx, rx) = tokio::sync::oneshot::channel(); senders.insert((key, *entry_lsn), tx); - reconstruct_state.update_key( - &key, - *entry_lsn, - crate::tenant::storage_layer::FutureValue::WalRecord { will_init, rx }, - ); + reconstruct_state.update_key(&key, *entry_lsn, todo!()); if will_init { break; diff --git a/pageserver/src/tenant/vectored_blob_io.rs b/pageserver/src/tenant/vectored_blob_io.rs index 70578f84e3..719bd8a212 100644 --- a/pageserver/src/tenant/vectored_blob_io.rs +++ b/pageserver/src/tenant/vectored_blob_io.rs @@ -33,7 +33,7 @@ use crate::virtual_file::{self, VirtualFile}; pub struct BlobMeta { pub key: Key, pub lsn: Lsn, - pub will_init: bool, + pub blob_type: BlobType, } /// Blob offsets into [`VectoredBlobsBuf::buf`] @@ -346,6 +346,13 @@ pub enum BlobFlag { ReplaceAll, } +#[derive(Debug, Clone, Copy)] +pub(crate) enum BlobType { + RawImage, + ValueImage, + ValueWalRecord{ will_init: bool }, +} + /// Planner for vectored blob reads. /// /// Blob offsets are received via [`VectoredReadPlanner::handle`] @@ -357,9 +364,9 @@ pub enum BlobFlag { pub struct VectoredReadPlanner { // Track all the blob offsets. Start offsets must be ordered. // Note: last bool is will_init - blobs: BTreeMap>, + blobs: BTreeMap>, // Arguments for previous blob passed into [`VectoredReadPlanner::handle`] - prev: Option<(Key, Lsn, u64, BlobFlag, bool)>, + prev: Option<(Key, Lsn, u64, BlobFlag, BlobType)>, max_read_size: usize, @@ -394,12 +401,12 @@ impl VectoredReadPlanner { /// This is used for WAL records that `will_init`. /// * [`BlobFlag::Ignore`]: This blob should not be included in the read. This happens /// if the blob is cached. - pub fn handle(&mut self, key: Key, lsn: Lsn, offset: u64, flag: BlobFlag, will_init: bool) { + pub fn handle(&mut self, key: Key, lsn: Lsn, offset: u64, flag: BlobFlag, blob_type: BlobType) { // Implementation note: internally lag behind by one blob such that // we have a start and end offset when initialising [`VectoredRead`] - let (prev_key, prev_lsn, prev_offset, prev_flag, prev_will_init) = match self.prev { + let (prev_key, prev_lsn, prev_offset, prev_flag, prev_blob_type) = match self.prev { None => { - self.prev = Some((key, lsn, offset, flag, will_init)); + self.prev = Some((key, lsn, offset, flag, blob_type)); return; } Some(prev) => prev, @@ -411,21 +418,21 @@ impl VectoredReadPlanner { prev_offset, offset, prev_flag, - prev_will_init, + prev_blob_type, ); - self.prev = Some((key, lsn, offset, flag, will_init)); + self.prev = Some((key, lsn, offset, flag, blob_type)); } pub fn handle_range_end(&mut self, offset: u64) { - if let Some((prev_key, prev_lsn, prev_offset, prev_flag, prev_will_init)) = self.prev { + if let Some((prev_key, prev_lsn, prev_offset, prev_flag, prev_blob_type)) = self.prev { self.add_blob( prev_key, prev_lsn, prev_offset, offset, prev_flag, - prev_will_init, + prev_blob_type, ); } @@ -439,17 +446,17 @@ impl VectoredReadPlanner { start_offset: u64, end_offset: u64, flag: BlobFlag, - will_init: bool, + blob_type: BlobType, ) { match flag { BlobFlag::None => { let blobs_for_key = self.blobs.entry(key).or_default(); - blobs_for_key.push((lsn, start_offset, end_offset, will_init)); + blobs_for_key.push((lsn, start_offset, end_offset, blob_type)); } BlobFlag::ReplaceAll => { let blobs_for_key = self.blobs.entry(key).or_default(); blobs_for_key.clear(); - blobs_for_key.push((lsn, start_offset, end_offset, will_init)); + blobs_for_key.push((lsn, start_offset, end_offset, blob_type)); } BlobFlag::Ignore => {} } @@ -460,7 +467,7 @@ impl VectoredReadPlanner { let mut reads = Vec::new(); for (key, blobs_for_key) in self.blobs { - for (lsn, start_offset, end_offset, will_init) in blobs_for_key { + for (lsn, start_offset, end_offset, blob_type) in blobs_for_key { let extended = match &mut current_read_builder { Some(read_builder) => read_builder.extend( start_offset, @@ -468,7 +475,7 @@ impl VectoredReadPlanner { BlobMeta { key, lsn, - will_init, + blob_type, }, ), None => VectoredReadExtended::No, @@ -481,7 +488,7 @@ impl VectoredReadPlanner { BlobMeta { key, lsn, - will_init, + blob_type, }, self.max_read_size, self.mode, @@ -699,7 +706,6 @@ impl StreamingVectoredReadPlanner { start_offset: u64, end_offset: u64, is_last_blob_in_read: bool, - // destination: oneshot::Sender>, ) -> Option { match &mut self.read_builder { Some(read_builder) => { @@ -709,7 +715,7 @@ impl StreamingVectoredReadPlanner { BlobMeta { key, lsn, - will_init: false, + blob_type: todo!(), }, ); assert_eq!(extended, VectoredReadExtended::Yes); @@ -722,7 +728,7 @@ impl StreamingVectoredReadPlanner { BlobMeta { key, lsn, - will_init: false, + blob_type: todo!(), }, self.mode, ))