mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 17:32:56 +00:00
carry reconstruct data
This commit is contained in:
@@ -6364,7 +6364,7 @@ mod tests {
|
||||
.await?;
|
||||
Ok(res.pop_last().map(|(k, v)| {
|
||||
assert_eq!(k, key);
|
||||
v.unwrap()
|
||||
v.unwrap().0
|
||||
}))
|
||||
}
|
||||
|
||||
|
||||
@@ -73,7 +73,7 @@ where
|
||||
/// the same ValueReconstructState struct in the next 'get_value_reconstruct_data'
|
||||
/// call, to collect more records.
|
||||
///
|
||||
#[derive(Debug, Default)]
|
||||
#[derive(Debug, Default, Clone, PartialEq, Eq)]
|
||||
pub struct ValueReconstructState {
|
||||
pub records: Vec<(Lsn, NeonWalRecord)>,
|
||||
pub img: Option<(Lsn, Bytes)>,
|
||||
|
||||
@@ -910,7 +910,9 @@ impl Timeline {
|
||||
img: cached_page_img,
|
||||
};
|
||||
|
||||
self.get_impl(key, lsn, reconstruct_state, ctx).await
|
||||
self.get_impl(key, lsn, reconstruct_state, ctx)
|
||||
.await
|
||||
.map(|v| v.0)
|
||||
}
|
||||
GetImpl::Vectored => {
|
||||
let keyspace = KeySpace {
|
||||
@@ -949,7 +951,7 @@ impl Timeline {
|
||||
"Singular vectored get returned wrong key"
|
||||
)))
|
||||
} else {
|
||||
value
|
||||
value.map(|v| v.0)
|
||||
}
|
||||
}
|
||||
None => Err(PageReconstructError::MissingKey(MissingKeyError {
|
||||
@@ -973,7 +975,7 @@ impl Timeline {
|
||||
lsn: Lsn,
|
||||
mut reconstruct_state: ValueReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Bytes, PageReconstructError> {
|
||||
) -> Result<(Bytes, ValueReconstructState), PageReconstructError> {
|
||||
// XXX: structured stats collection for layer eviction here.
|
||||
trace!(
|
||||
"get page request for {}@{} from task kind {:?}",
|
||||
@@ -1112,7 +1114,12 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
res
|
||||
let Ok(res) = res else {
|
||||
return Err(res.unwrap_err());
|
||||
};
|
||||
Ok(BTreeMap::from_iter(
|
||||
res.into_iter().map(|(k, v)| (k, v.map(|v| v.0))),
|
||||
))
|
||||
}
|
||||
|
||||
/// Scan the keyspace and return all existing key-values in the keyspace. This currently uses vectored
|
||||
@@ -1176,7 +1183,12 @@ impl Timeline {
|
||||
recording.observe(throttled);
|
||||
}
|
||||
|
||||
vectored_res
|
||||
let Ok(vectored_res) = vectored_res else {
|
||||
return Err(vectored_res.unwrap_err());
|
||||
};
|
||||
Ok(BTreeMap::from_iter(
|
||||
vectored_res.into_iter().map(|(k, v)| (k, v.map(|v| v.0))),
|
||||
))
|
||||
}
|
||||
|
||||
/// Not subject to [`Self::timeline_get_throttle`].
|
||||
@@ -1185,7 +1197,10 @@ impl Timeline {
|
||||
keyspace: KeySpace,
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
|
||||
) -> Result<
|
||||
BTreeMap<Key, Result<(Bytes, ValueReconstructState), PageReconstructError>>,
|
||||
GetVectoredError,
|
||||
> {
|
||||
let mut values = BTreeMap::new();
|
||||
|
||||
for range in keyspace.ranges {
|
||||
@@ -1244,7 +1259,10 @@ impl Timeline {
|
||||
lsn: Lsn,
|
||||
reconstruct_state: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
|
||||
) -> Result<
|
||||
BTreeMap<Key, Result<(Bytes, ValueReconstructState), PageReconstructError>>,
|
||||
GetVectoredError,
|
||||
> {
|
||||
let get_kind = if keyspace.total_raw_size() == 1 {
|
||||
GetKind::Singular
|
||||
} else {
|
||||
@@ -1261,7 +1279,10 @@ impl Timeline {
|
||||
let reconstruct_timer = crate::metrics::RECONSTRUCT_TIME
|
||||
.for_get_kind(get_kind)
|
||||
.start_timer();
|
||||
let mut results: BTreeMap<Key, Result<Bytes, PageReconstructError>> = BTreeMap::new();
|
||||
let mut results: BTreeMap<
|
||||
Key,
|
||||
Result<(Bytes, ValueReconstructState), PageReconstructError>,
|
||||
> = BTreeMap::new();
|
||||
let layers_visited = reconstruct_state.get_layers_visited();
|
||||
|
||||
for (key, res) in std::mem::take(&mut reconstruct_state.keys) {
|
||||
@@ -1297,7 +1318,10 @@ impl Timeline {
|
||||
/// Not subject to [`Self::timeline_get_throttle`].
|
||||
pub(super) async fn validate_get_vectored_impl(
|
||||
&self,
|
||||
vectored_res: &Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError>,
|
||||
vectored_res: &Result<
|
||||
BTreeMap<Key, Result<(Bytes, ValueReconstructState), PageReconstructError>>,
|
||||
GetVectoredError,
|
||||
>,
|
||||
keyspace: KeySpace,
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
@@ -1370,8 +1394,8 @@ impl Timeline {
|
||||
key: &Key,
|
||||
keyspace: &KeySpace,
|
||||
lsn: Lsn,
|
||||
seq: &Bytes,
|
||||
vec: &Bytes,
|
||||
(seq, seq_reconstruct_state): &(Bytes, ValueReconstructState),
|
||||
(vec, vec_reconstruct_state): &(Bytes, ValueReconstructState),
|
||||
) {
|
||||
if *key == AUX_FILES_KEY {
|
||||
// The value reconstruct of AUX_FILES_KEY from records is not deterministic
|
||||
@@ -1394,10 +1418,16 @@ impl Timeline {
|
||||
}
|
||||
} else {
|
||||
// All other keys should reconstruct deterministically, so we simply compare the blobs.
|
||||
assert_eq!(
|
||||
seq, vec,
|
||||
"Image mismatch for key {key} - keyspace={keyspace:?} lsn={lsn}"
|
||||
);
|
||||
if seq != vec {
|
||||
assert_eq!(
|
||||
seq_reconstruct_state, vec_reconstruct_state,
|
||||
"Reconstruct state mismatch for key {key} - keyspace={keyspace:?} lsn={lsn}"
|
||||
);
|
||||
assert_eq!(
|
||||
seq, vec,
|
||||
"Image mismatch for key {key} - keyspace={keyspace:?} lsn={lsn}"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4356,7 +4386,7 @@ impl Timeline {
|
||||
let mut total_kb_retrieved = 0;
|
||||
let mut total_keys_retrieved = 0;
|
||||
for (k, v) in data {
|
||||
let v = v.map_err(CreateImageLayersError::PageReconstructError)?;
|
||||
let (v, _) = v.map_err(CreateImageLayersError::PageReconstructError)?;
|
||||
total_kb_retrieved += KEY_SIZE + v.len();
|
||||
total_keys_retrieved += 1;
|
||||
new_data.insert(k, v);
|
||||
@@ -5151,7 +5181,7 @@ impl Timeline {
|
||||
key: Key,
|
||||
request_lsn: Lsn,
|
||||
mut data: ValueReconstructState,
|
||||
) -> Result<Bytes, PageReconstructError> {
|
||||
) -> Result<(Bytes, ValueReconstructState), PageReconstructError> {
|
||||
// Perform WAL redo if needed
|
||||
data.records.reverse();
|
||||
|
||||
@@ -5164,7 +5194,7 @@ impl Timeline {
|
||||
img_lsn,
|
||||
request_lsn,
|
||||
);
|
||||
Ok(img.clone())
|
||||
Ok((img.clone(), data))
|
||||
} else {
|
||||
Err(PageReconstructError::from(anyhow!(
|
||||
"base image for {key} at {request_lsn} not found"
|
||||
@@ -5196,6 +5226,8 @@ impl Timeline {
|
||||
|
||||
let last_rec_lsn = data.records.last().unwrap().0;
|
||||
|
||||
let ret_state = data.clone();
|
||||
|
||||
let img = match self
|
||||
.walredo_mgr
|
||||
.as_ref()
|
||||
@@ -5226,7 +5258,7 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
Ok(img)
|
||||
Ok((img, ret_state))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1064,7 +1064,7 @@ impl Timeline {
|
||||
img: base_image,
|
||||
records: delta_above_base_image,
|
||||
};
|
||||
let img = tline.reconstruct_value(key, horizon, state).await?;
|
||||
let (img, _) = tline.reconstruct_value(key, horizon, state).await?;
|
||||
Ok((keys_above_horizon, img))
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user