diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 1f5673d814..17673e2cb9 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -323,16 +323,15 @@ impl ValuesReconstructState { let key_done = match state.situation { ValueReconstructSituation::Complete => unreachable!(), ValueReconstructSituation::Continue => match future_value { - FutureValue::ValueImage { rx } => { + FutureValue::Img { rx } => { assert!(state.img.is_none()); state.img = Some((lsn, rx)); true } - FutureValue::ValueWalRecord { will_init, rx } => { + FutureValue::WalRecord { will_init, rx } => { state.records.push((lsn, rx)); will_init } - FutureValue::RawImage { rx } => todo!(), }, }; @@ -368,14 +367,11 @@ impl ValuesReconstructState { } enum FutureValue { - RawImage { - rx: sync::oneshot::Receiver>, - }, - ValueWalRecord { + WalRecord { will_init: bool, rx: sync::oneshot::Receiver>, }, - ValueImage { + Img { rx: sync::oneshot::Receiver>, }, } diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 8323efd692..032543c4f0 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -39,7 +39,7 @@ use crate::tenant::disk_btree::{ use crate::tenant::storage_layer::layer::S3_UPLOAD_LIMIT; use crate::tenant::timeline::GetVectoredError; use crate::tenant::vectored_blob_io::{ - BlobFlag, BlobType, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead, + BlobFlag, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead, VectoredReadCoalesceMode, VectoredReadPlanner, }; use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt}; @@ -934,17 +934,7 @@ impl DeltaLayerInner { range_end_handled = true; break; } else { - planner.handle( - key, - lsn, - blob_ref.pos(), - flag, - if blob_ref.will_init() { - BlobType::ValueImage - } else { - BlobType::ValueImage - }, - ); + planner.handle(key, lsn, blob_ref.pos(), flag, blob_ref.will_init()); } } @@ -1012,12 +1002,9 @@ impl DeltaLayerInner { reconstruct_state.update_key( &blob_meta.key, blob_meta.lsn, - match blob_meta.blob_type { - BlobType::RawImage => unreachable!(), - BlobType::ValueImage => super::FutureValue::ValueImage { rx }, - BlobType::ValueWalRecord { will_init } => { - super::FutureValue::ValueWalRecord { will_init, rx } - } + super::FutureValue::WalRecord { + will_init: blob_meta.will_init, + rx, }, ); } @@ -1192,7 +1179,7 @@ impl DeltaLayerInner { BlobMeta { key, lsn, - blob_type: todo!(), + will_init: false, }, start_offset..end_offset, )) diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 5c63270eb9..8abd8f2bb7 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -513,7 +513,7 @@ impl ImageLayerInner { range_end_handled = true; break; } else { - planner.handle(key, self.lsn, offset, flag, crate::tenant::vectored_blob_io::BlobType::RawImage); + planner.handle(key, self.lsn, offset, flag, true); } } @@ -591,7 +591,7 @@ impl ImageLayerInner { reconstruct_state.update_key( &blob_meta.key, blob_meta.lsn, - super::FutureValue::RawImage { rx }, + super::FutureValue::Img { rx }, ); } diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 4d9970fb5e..b6dc6c0ada 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -467,7 +467,11 @@ impl InMemoryLayer { let (tx, rx) = tokio::sync::oneshot::channel(); senders.insert((key, *entry_lsn), tx); - reconstruct_state.update_key(&key, *entry_lsn, todo!()); + reconstruct_state.update_key( + &key, + *entry_lsn, + crate::tenant::storage_layer::FutureValue::WalRecord { will_init, rx }, + ); if will_init { break; diff --git a/pageserver/src/tenant/vectored_blob_io.rs b/pageserver/src/tenant/vectored_blob_io.rs index 719bd8a212..70578f84e3 100644 --- a/pageserver/src/tenant/vectored_blob_io.rs +++ b/pageserver/src/tenant/vectored_blob_io.rs @@ -33,7 +33,7 @@ use crate::virtual_file::{self, VirtualFile}; pub struct BlobMeta { pub key: Key, pub lsn: Lsn, - pub blob_type: BlobType, + pub will_init: bool, } /// Blob offsets into [`VectoredBlobsBuf::buf`] @@ -346,13 +346,6 @@ pub enum BlobFlag { ReplaceAll, } -#[derive(Debug, Clone, Copy)] -pub(crate) enum BlobType { - RawImage, - ValueImage, - ValueWalRecord{ will_init: bool }, -} - /// Planner for vectored blob reads. /// /// Blob offsets are received via [`VectoredReadPlanner::handle`] @@ -364,9 +357,9 @@ pub(crate) enum BlobType { pub struct VectoredReadPlanner { // Track all the blob offsets. Start offsets must be ordered. // Note: last bool is will_init - blobs: BTreeMap>, + blobs: BTreeMap>, // Arguments for previous blob passed into [`VectoredReadPlanner::handle`] - prev: Option<(Key, Lsn, u64, BlobFlag, BlobType)>, + prev: Option<(Key, Lsn, u64, BlobFlag, bool)>, max_read_size: usize, @@ -401,12 +394,12 @@ impl VectoredReadPlanner { /// This is used for WAL records that `will_init`. /// * [`BlobFlag::Ignore`]: This blob should not be included in the read. This happens /// if the blob is cached. - pub fn handle(&mut self, key: Key, lsn: Lsn, offset: u64, flag: BlobFlag, blob_type: BlobType) { + pub fn handle(&mut self, key: Key, lsn: Lsn, offset: u64, flag: BlobFlag, will_init: bool) { // Implementation note: internally lag behind by one blob such that // we have a start and end offset when initialising [`VectoredRead`] - let (prev_key, prev_lsn, prev_offset, prev_flag, prev_blob_type) = match self.prev { + let (prev_key, prev_lsn, prev_offset, prev_flag, prev_will_init) = match self.prev { None => { - self.prev = Some((key, lsn, offset, flag, blob_type)); + self.prev = Some((key, lsn, offset, flag, will_init)); return; } Some(prev) => prev, @@ -418,21 +411,21 @@ impl VectoredReadPlanner { prev_offset, offset, prev_flag, - prev_blob_type, + prev_will_init, ); - self.prev = Some((key, lsn, offset, flag, blob_type)); + self.prev = Some((key, lsn, offset, flag, will_init)); } pub fn handle_range_end(&mut self, offset: u64) { - if let Some((prev_key, prev_lsn, prev_offset, prev_flag, prev_blob_type)) = self.prev { + if let Some((prev_key, prev_lsn, prev_offset, prev_flag, prev_will_init)) = self.prev { self.add_blob( prev_key, prev_lsn, prev_offset, offset, prev_flag, - prev_blob_type, + prev_will_init, ); } @@ -446,17 +439,17 @@ impl VectoredReadPlanner { start_offset: u64, end_offset: u64, flag: BlobFlag, - blob_type: BlobType, + will_init: bool, ) { match flag { BlobFlag::None => { let blobs_for_key = self.blobs.entry(key).or_default(); - blobs_for_key.push((lsn, start_offset, end_offset, blob_type)); + blobs_for_key.push((lsn, start_offset, end_offset, will_init)); } BlobFlag::ReplaceAll => { let blobs_for_key = self.blobs.entry(key).or_default(); blobs_for_key.clear(); - blobs_for_key.push((lsn, start_offset, end_offset, blob_type)); + blobs_for_key.push((lsn, start_offset, end_offset, will_init)); } BlobFlag::Ignore => {} } @@ -467,7 +460,7 @@ impl VectoredReadPlanner { let mut reads = Vec::new(); for (key, blobs_for_key) in self.blobs { - for (lsn, start_offset, end_offset, blob_type) in blobs_for_key { + for (lsn, start_offset, end_offset, will_init) in blobs_for_key { let extended = match &mut current_read_builder { Some(read_builder) => read_builder.extend( start_offset, @@ -475,7 +468,7 @@ impl VectoredReadPlanner { BlobMeta { key, lsn, - blob_type, + will_init, }, ), None => VectoredReadExtended::No, @@ -488,7 +481,7 @@ impl VectoredReadPlanner { BlobMeta { key, lsn, - blob_type, + will_init, }, self.max_read_size, self.mode, @@ -706,6 +699,7 @@ impl StreamingVectoredReadPlanner { start_offset: u64, end_offset: u64, is_last_blob_in_read: bool, + // destination: oneshot::Sender>, ) -> Option { match &mut self.read_builder { Some(read_builder) => { @@ -715,7 +709,7 @@ impl StreamingVectoredReadPlanner { BlobMeta { key, lsn, - blob_type: todo!(), + will_init: false, }, ); assert_eq!(extended, VectoredReadExtended::Yes); @@ -728,7 +722,7 @@ impl StreamingVectoredReadPlanner { BlobMeta { key, lsn, - blob_type: todo!(), + will_init: false, }, self.mode, ))