fixup image deserialization

This commit is contained in:
Vlad Lazar
2024-09-12 19:24:41 +01:00
parent 6f20726610
commit 2a5336b9ab
6 changed files with 50 additions and 23 deletions

View File

@@ -73,6 +73,21 @@ impl ValueBytes {
Ok(raw[8] == 1)
}
pub(crate) fn is_image(raw: &[u8]) -> Result<bool, InvalidInput> {
if raw.len() < 12 {
return Err(InvalidInput::TooShortValue);
}
let value_discriminator = &raw[0..4];
if value_discriminator == [0, 0, 0, 0] {
// Value::Image always initializes
return Ok(true);
}
Ok(false)
}
}
#[cfg(test)]

View File

@@ -13,7 +13,7 @@ use utils::bin_ser::BeSer;
pub mod split_writer;
use crate::context::{AccessStatsBehavior, RequestContext};
use crate::repository::Value;
use crate::repository::{Value, ValueBytes};
use crate::walrecord::NeonWalRecord;
use bytes::Bytes;
use pageserver_api::key::Key;
@@ -110,15 +110,23 @@ pub(crate) async fn convert(
match fut.await {
Ok(res) => match res {
Ok(bytes) => {
let value = Value::des(&bytes)
.map_err(|err| PageReconstructError::Other(err.into()))?;
let is_image = ValueBytes::is_image(&bytes).map_err(|err| {
PageReconstructError::Other(anyhow::anyhow!(
"Failed to check image discriminator: {err:?}"
))
})?;
match value {
Value::Image(img) => {
to.img = Some((lsn, img));
}
Value::WalRecord(rec) => {
if is_image {
to.img = Some((lsn, bytes));
} else {
let value = Value::des(&bytes)
.map_err(|err| PageReconstructError::Other(err.into()))?;
if let Value::WalRecord(rec) = value {
to.records.push((lsn, rec));
} else {
return Err(PageReconstructError::Other(anyhow::anyhow!(
"Deserialized to image"
)));
}
}
}

View File

@@ -1028,7 +1028,8 @@ impl DeltaLayerInner {
}
Err(err) => {
for (_, sender) in senders {
let _ = sender.send(Err(std::io::Error::new(err.kind(), "vec read failed")));
let _ = sender
.send(Err(std::io::Error::new(err.kind(), "vec read failed")));
}
}
}

View File

@@ -589,12 +589,7 @@ impl ImageLayerInner {
let (tx, rx) = oneshot::channel();
senders.insert((blob_meta.key, blob_meta.lsn), tx);
reconstruct_state.update_key(
&blob_meta.key,
blob_meta.lsn,
true,
rx,
);
reconstruct_state.update_key(&blob_meta.key, blob_meta.lsn, true, rx);
}
let buf_size = read.size();
@@ -636,7 +631,8 @@ impl ImageLayerInner {
}
Err(err) => {
for (_, sender) in senders {
let _ = sender.send(Err(std::io::Error::new(err.kind(), "vec read failed")));
let _ = sender
.send(Err(std::io::Error::new(err.kind(), "vec read failed")));
}
}
}

View File

@@ -34,9 +34,7 @@ use std::sync::atomic::Ordering as AtomicOrdering;
use std::sync::atomic::{AtomicU64, AtomicUsize};
use tokio::sync::RwLock;
use super::{
DeltaLayerWriter, PersistentLayerDesc, ValuesReconstructState,
};
use super::{DeltaLayerWriter, PersistentLayerDesc, ValuesReconstructState};
pub(crate) mod vectored_dio_read;
@@ -502,7 +500,8 @@ impl InMemoryLayer {
let sender = senders
.remove(&(key, entry_lsn))
.expect("sender must exist");
let _ = sender.send(Err(std::io::Error::new(e.kind(), "dio vec read failed")));
let _ = sender
.send(Err(std::io::Error::new(e.kind(), "dio vec read failed")));
}
Ok(value_buf) => {
let _ = sender.send(Ok(value_buf.into()));

View File

@@ -1140,7 +1140,10 @@ impl Timeline {
let walredo_self = self.myself.upgrade().expect("&self method holds the arc");
async move {
let state = res.expect("Read path is infallible");
assert!(matches!(state.situation, ValueReconstructSituation::Complete));
assert!(matches!(
state.situation,
ValueReconstructSituation::Complete
));
let converted = match convert(state).await {
Ok(ok) => ok,
@@ -1149,12 +1152,17 @@ impl Timeline {
}
};
(key, walredo_self.reconstruct_value(key, lsn, converted).await)
(
key,
walredo_self.reconstruct_value(key, lsn, converted).await,
)
}
});
}
let results = futs.collect::<BTreeMap<Key, Result<Bytes, PageReconstructError>>>().await;
let results = futs
.collect::<BTreeMap<Key, Result<Bytes, PageReconstructError>>>()
.await;
reconstruct_timer.stop_and_record();