diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 1ae3cfaf25..17673e2cb9 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -312,8 +312,7 @@ impl ValuesReconstructState { &mut self, key: &Key, lsn: Lsn, - completes: bool, - value: sync::oneshot::Receiver>, + future_value: FutureValue, ) -> ValueReconstructSituation { let state = self .keys @@ -321,14 +320,22 @@ impl ValuesReconstructState { .or_insert(Ok(VectoredValueReconstructState::default())); if let Ok(state) = state { - match state.situation { + let key_done = match state.situation { ValueReconstructSituation::Complete => unreachable!(), - ValueReconstructSituation::Continue => { - state.records.push((lsn, value)); - } - } + ValueReconstructSituation::Continue => match future_value { + FutureValue::Img { rx } => { + assert!(state.img.is_none()); + state.img = Some((lsn, rx)); + true + } + FutureValue::WalRecord { will_init, rx } => { + state.records.push((lsn, rx)); + will_init + } + }, + }; - if completes && state.situation == ValueReconstructSituation::Continue { + if key_done && state.situation == ValueReconstructSituation::Continue { state.situation = ValueReconstructSituation::Complete; self.keys_done.add_key(*key); } @@ -359,6 +366,16 @@ impl ValuesReconstructState { } } +enum FutureValue { + WalRecord { + will_init: bool, + rx: sync::oneshot::Receiver>, + }, + Img { + rx: sync::oneshot::Receiver>, + }, +} + impl Default for ValuesReconstructState { fn default() -> Self { Self::new() diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index d68dec4754..032543c4f0 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -1002,8 +1002,10 @@ impl DeltaLayerInner { reconstruct_state.update_key( &blob_meta.key, blob_meta.lsn, - blob_meta.will_init, - rx, + super::FutureValue::WalRecord { + will_init: blob_meta.will_init, + rx, + }, ); } diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 609a033426..8abd8f2bb7 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -588,8 +588,11 @@ impl ImageLayerInner { for (_, blob_meta) in read.blobs_at.as_slice() { let (tx, rx) = oneshot::channel(); senders.insert((blob_meta.key, blob_meta.lsn), tx); - - reconstruct_state.update_key(&blob_meta.key, blob_meta.lsn, true, rx); + reconstruct_state.update_key( + &blob_meta.key, + blob_meta.lsn, + super::FutureValue::Img { rx }, + ); } let buf_size = read.size(); @@ -624,9 +627,7 @@ impl ImageLayerInner { let sender = senders .remove(&(meta.meta.key, meta.meta.lsn)) .expect("sender must exist"); - // TODO: this is silly - sort it out - let bytes = Value::ser(&Value::Image(Bytes::copy_from_slice(buf))) - .expect("stupid but correct"); + let bytes = Bytes::copy_from_slice(buf); let _ = sender.send(Ok(bytes.into())); } diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 70c42d44d4..b6dc6c0ada 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -467,7 +467,11 @@ impl InMemoryLayer { let (tx, rx) = tokio::sync::oneshot::channel(); senders.insert((key, *entry_lsn), tx); - reconstruct_state.update_key(&key, *entry_lsn, will_init, rx); + reconstruct_state.update_key( + &key, + *entry_lsn, + crate::tenant::storage_layer::FutureValue::WalRecord { will_init, rx }, + ); if will_init { break;