diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index e4ebafd927..a7c155ef82 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -73,6 +73,21 @@ impl ValueBytes { Ok(raw[8] == 1) } + + pub(crate) fn is_image(raw: &[u8]) -> Result { + if raw.len() < 12 { + return Err(InvalidInput::TooShortValue); + } + + let value_discriminator = &raw[0..4]; + + if value_discriminator == [0, 0, 0, 0] { + // Value::Image always initializes + return Ok(true); + } + + Ok(false) + } } #[cfg(test)] diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 1ae372c0d8..6b12c89c53 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -13,7 +13,7 @@ use utils::bin_ser::BeSer; pub mod split_writer; use crate::context::{AccessStatsBehavior, RequestContext}; -use crate::repository::Value; +use crate::repository::{Value, ValueBytes}; use crate::walrecord::NeonWalRecord; use bytes::Bytes; use pageserver_api::key::Key; @@ -110,15 +110,23 @@ pub(crate) async fn convert( match fut.await { Ok(res) => match res { Ok(bytes) => { - let value = Value::des(&bytes) - .map_err(|err| PageReconstructError::Other(err.into()))?; + let is_image = ValueBytes::is_image(&bytes).map_err(|err| { + PageReconstructError::Other(anyhow::anyhow!( + "Failed to check image discriminator: {err:?}" + )) + })?; - match value { - Value::Image(img) => { - to.img = Some((lsn, img)); - } - Value::WalRecord(rec) => { + if is_image { + to.img = Some((lsn, bytes)); + } else { + let value = Value::des(&bytes) + .map_err(|err| PageReconstructError::Other(err.into()))?; + if let Value::WalRecord(rec) = value { to.records.push((lsn, rec)); + } else { + return Err(PageReconstructError::Other(anyhow::anyhow!( + "Deserialized to image" + ))); } } } diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 05c22b3344..ac57e14b37 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -1028,7 +1028,8 @@ impl DeltaLayerInner { } Err(err) => { for (_, sender) in senders { - let _ = sender.send(Err(std::io::Error::new(err.kind(), "vec read failed"))); + let _ = sender + .send(Err(std::io::Error::new(err.kind(), "vec read failed"))); } } } diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index e795ad43cd..999e9fed4d 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -589,12 +589,7 @@ impl ImageLayerInner { let (tx, rx) = oneshot::channel(); senders.insert((blob_meta.key, blob_meta.lsn), tx); - reconstruct_state.update_key( - &blob_meta.key, - blob_meta.lsn, - true, - rx, - ); + reconstruct_state.update_key(&blob_meta.key, blob_meta.lsn, true, rx); } let buf_size = read.size(); @@ -636,7 +631,8 @@ impl ImageLayerInner { } Err(err) => { for (_, sender) in senders { - let _ = sender.send(Err(std::io::Error::new(err.kind(), "vec read failed"))); + let _ = sender + .send(Err(std::io::Error::new(err.kind(), "vec read failed"))); } } } diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 0d2f118597..e60ac3452d 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -34,9 +34,7 @@ use std::sync::atomic::Ordering as AtomicOrdering; use std::sync::atomic::{AtomicU64, AtomicUsize}; use tokio::sync::RwLock; -use super::{ - DeltaLayerWriter, PersistentLayerDesc, ValuesReconstructState, -}; +use super::{DeltaLayerWriter, PersistentLayerDesc, ValuesReconstructState}; pub(crate) mod vectored_dio_read; @@ -502,7 +500,8 @@ impl InMemoryLayer { let sender = senders .remove(&(key, entry_lsn)) .expect("sender must exist"); - let _ = sender.send(Err(std::io::Error::new(e.kind(), "dio vec read failed"))); + let _ = sender + .send(Err(std::io::Error::new(e.kind(), "dio vec read failed"))); } Ok(value_buf) => { let _ = sender.send(Ok(value_buf.into())); diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 85fb3f0d12..d3ca5411ae 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1140,7 +1140,10 @@ impl Timeline { let walredo_self = self.myself.upgrade().expect("&self method holds the arc"); async move { let state = res.expect("Read path is infallible"); - assert!(matches!(state.situation, ValueReconstructSituation::Complete)); + assert!(matches!( + state.situation, + ValueReconstructSituation::Complete + )); let converted = match convert(state).await { Ok(ok) => ok, @@ -1149,12 +1152,17 @@ impl Timeline { } }; - (key, walredo_self.reconstruct_value(key, lsn, converted).await) + ( + key, + walredo_self.reconstruct_value(key, lsn, converted).await, + ) } }); } - let results = futs.collect::>>().await; + let results = futs + .collect::>>() + .await; reconstruct_timer.stop_and_record();