mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 08:52:56 +00:00
prepare Timeline::get_reconstruct_data for becoming async (#3271)
This patch restructures the code so that PR https://github.com/neondatabase/neon/pull/3228 can seamlessly replace the return PageReconstructResult::NeedsDownload with a download_remote_layer().await. Background: PR https://github.com/neondatabase/neon/pull/3228 will turn get_reconstruct_data() async and do the on-demand download right in place, instead of returning a PageReconstructResult::NeedsDownload. Current rustc requires that the layers lock guard be not in scope across an await point. For on-demand download inside get_reconstruct_data(), we need to do download_remote_layer().await. Supersedes https://github.com/neondatabase/neon/pull/3260 See my comment there: https://github.com/neondatabase/neon/pull/3260#issuecomment-1370752407 Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
This commit is contained in:
committed by
GitHub
parent
af9425394f
commit
3526323bc4
@@ -260,8 +260,10 @@ where
|
||||
/// contain the version, even if it's missing from the returned
|
||||
/// layer.
|
||||
///
|
||||
/// NOTE: This only searches the 'historic' layers, *not* the
|
||||
/// 'open' and 'frozen' layers!
|
||||
///
|
||||
pub fn search(&self, key: Key, end_lsn: Lsn) -> Option<SearchResult<L>> {
|
||||
// linear search
|
||||
// Find the latest image layer that covers the given key
|
||||
let mut latest_img: Option<Arc<L>> = None;
|
||||
let mut latest_img_lsn: Option<Lsn> = None;
|
||||
|
||||
@@ -1642,8 +1642,7 @@ impl Timeline {
|
||||
|
||||
// For debugging purposes, collect the path of layers that we traversed
|
||||
// through. It's included in the error message if we fail to find the key.
|
||||
let mut traversal_path =
|
||||
Vec::<(ValueReconstructResult, Lsn, Box<dyn TraversalLayerExt>)>::new();
|
||||
let mut traversal_path = Vec::<TraversalPathItem>::new();
|
||||
|
||||
let cached_lsn = if let Some((cached_lsn, _)) = &reconstruct_state.img {
|
||||
*cached_lsn
|
||||
@@ -1708,82 +1707,132 @@ impl Timeline {
|
||||
timeline_owned = ancestor;
|
||||
timeline = &*timeline_owned;
|
||||
prev_lsn = Lsn(u64::MAX);
|
||||
continue;
|
||||
continue 'outer;
|
||||
}
|
||||
|
||||
let layers = timeline.layers.read().unwrap();
|
||||
#[allow(unused_labels, clippy::never_loop)] // see comment at bottom of this loop
|
||||
'layer_map_search: loop {
|
||||
let remote_layer = {
|
||||
let layers = timeline.layers.read().unwrap();
|
||||
|
||||
// Check the open and frozen in-memory layers first, in order from newest
|
||||
// to oldest.
|
||||
if let Some(open_layer) = &layers.open_layer {
|
||||
let start_lsn = open_layer.get_lsn_range().start;
|
||||
if cont_lsn > start_lsn {
|
||||
//info!("CHECKING for {} at {} on open layer {}", key, cont_lsn, open_layer.filename().display());
|
||||
// Get all the data needed to reconstruct the page version from this layer.
|
||||
// But if we have an older cached page image, no need to go past that.
|
||||
let lsn_floor = max(cached_lsn + 1, start_lsn);
|
||||
result = match open_layer.get_value_reconstruct_data(
|
||||
key,
|
||||
lsn_floor..cont_lsn,
|
||||
reconstruct_state,
|
||||
) {
|
||||
Ok(result) => result,
|
||||
Err(e) => return PageReconstructResult::from(e),
|
||||
};
|
||||
cont_lsn = lsn_floor;
|
||||
traversal_path.push((result, cont_lsn, Box::new(open_layer.clone())));
|
||||
continue;
|
||||
}
|
||||
}
|
||||
for frozen_layer in layers.frozen_layers.iter().rev() {
|
||||
let start_lsn = frozen_layer.get_lsn_range().start;
|
||||
if cont_lsn > start_lsn {
|
||||
//info!("CHECKING for {} at {} on frozen layer {}", key, cont_lsn, frozen_layer.filename().display());
|
||||
let lsn_floor = max(cached_lsn + 1, start_lsn);
|
||||
result = match frozen_layer.get_value_reconstruct_data(
|
||||
key,
|
||||
lsn_floor..cont_lsn,
|
||||
reconstruct_state,
|
||||
) {
|
||||
Ok(result) => result,
|
||||
Err(e) => return PageReconstructResult::from(e),
|
||||
};
|
||||
cont_lsn = lsn_floor;
|
||||
traversal_path.push((result, cont_lsn, Box::new(frozen_layer.clone())));
|
||||
continue 'outer;
|
||||
}
|
||||
}
|
||||
// Check the open and frozen in-memory layers first, in order from newest
|
||||
// to oldest.
|
||||
if let Some(open_layer) = &layers.open_layer {
|
||||
let start_lsn = open_layer.get_lsn_range().start;
|
||||
if cont_lsn > start_lsn {
|
||||
//info!("CHECKING for {} at {} on open layer {}", key, cont_lsn, open_layer.filename().display());
|
||||
// Get all the data needed to reconstruct the page version from this layer.
|
||||
// But if we have an older cached page image, no need to go past that.
|
||||
let lsn_floor = max(cached_lsn + 1, start_lsn);
|
||||
result = match open_layer.get_value_reconstruct_data(
|
||||
key,
|
||||
lsn_floor..cont_lsn,
|
||||
reconstruct_state,
|
||||
) {
|
||||
Ok(result) => result,
|
||||
Err(e) => return PageReconstructResult::from(e),
|
||||
};
|
||||
cont_lsn = lsn_floor;
|
||||
traversal_path.push((
|
||||
result,
|
||||
cont_lsn,
|
||||
Box::new({
|
||||
let open_layer = Arc::clone(open_layer);
|
||||
move || open_layer.traversal_id()
|
||||
}),
|
||||
));
|
||||
continue 'outer;
|
||||
}
|
||||
}
|
||||
for frozen_layer in layers.frozen_layers.iter().rev() {
|
||||
let start_lsn = frozen_layer.get_lsn_range().start;
|
||||
if cont_lsn > start_lsn {
|
||||
//info!("CHECKING for {} at {} on frozen layer {}", key, cont_lsn, frozen_layer.filename().display());
|
||||
let lsn_floor = max(cached_lsn + 1, start_lsn);
|
||||
result = match frozen_layer.get_value_reconstruct_data(
|
||||
key,
|
||||
lsn_floor..cont_lsn,
|
||||
reconstruct_state,
|
||||
) {
|
||||
Ok(result) => result,
|
||||
Err(e) => return PageReconstructResult::from(e),
|
||||
};
|
||||
cont_lsn = lsn_floor;
|
||||
traversal_path.push((
|
||||
result,
|
||||
cont_lsn,
|
||||
Box::new({
|
||||
let frozen_layer = Arc::clone(frozen_layer);
|
||||
move || frozen_layer.traversal_id()
|
||||
}),
|
||||
));
|
||||
continue 'outer;
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) {
|
||||
//info!("CHECKING for {} at {} on historic layer {}", key, cont_lsn, layer.filename().display());
|
||||
|
||||
// If it's a remote layer, the caller can do the download and retry.
|
||||
if let Some(remote_layer) = super::storage_layer::downcast_remote_layer(&layer) {
|
||||
info!("need remote layer {}", layer.traversal_id());
|
||||
return PageReconstructResult::NeedsDownload(
|
||||
Weak::clone(&timeline.myself),
|
||||
Arc::downgrade(&remote_layer),
|
||||
);
|
||||
}
|
||||
|
||||
let lsn_floor = max(cached_lsn + 1, lsn_floor);
|
||||
result = match layer.get_value_reconstruct_data(
|
||||
key,
|
||||
lsn_floor..cont_lsn,
|
||||
reconstruct_state,
|
||||
) {
|
||||
Ok(result) => result,
|
||||
Err(e) => return PageReconstructResult::from(e),
|
||||
if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) {
|
||||
// If it's a remote layer, download it and retry.
|
||||
if let Some(remote_layer) =
|
||||
super::storage_layer::downcast_remote_layer(&layer)
|
||||
{
|
||||
// TODO: push a breadcrumb to 'traversal_path' to record the fact that
|
||||
// we downloaded / would need to download this layer.
|
||||
remote_layer // download happens outside the scope of `layers` guard object
|
||||
} else {
|
||||
// Get all the data needed to reconstruct the page version from this layer.
|
||||
// But if we have an older cached page image, no need to go past that.
|
||||
let lsn_floor = max(cached_lsn + 1, lsn_floor);
|
||||
result = match layer.get_value_reconstruct_data(
|
||||
key,
|
||||
lsn_floor..cont_lsn,
|
||||
reconstruct_state,
|
||||
) {
|
||||
Ok(result) => result,
|
||||
Err(e) => return PageReconstructResult::from(e),
|
||||
};
|
||||
cont_lsn = lsn_floor;
|
||||
traversal_path.push((
|
||||
result,
|
||||
cont_lsn,
|
||||
Box::new({
|
||||
let layer = Arc::clone(&layer);
|
||||
move || layer.traversal_id()
|
||||
}),
|
||||
));
|
||||
continue 'outer;
|
||||
}
|
||||
} else if timeline.ancestor_timeline.is_some() {
|
||||
// Nothing on this timeline. Traverse to parent
|
||||
result = ValueReconstructResult::Continue;
|
||||
cont_lsn = Lsn(timeline.ancestor_lsn.0 + 1);
|
||||
continue 'outer;
|
||||
} else {
|
||||
// Nothing found
|
||||
result = ValueReconstructResult::Missing;
|
||||
continue 'outer;
|
||||
}
|
||||
};
|
||||
cont_lsn = lsn_floor;
|
||||
traversal_path.push((result, cont_lsn, Box::new(layer.clone())));
|
||||
} else if timeline.ancestor_timeline.is_some() {
|
||||
// Nothing on this timeline. Traverse to parent
|
||||
result = ValueReconstructResult::Continue;
|
||||
cont_lsn = Lsn(timeline.ancestor_lsn.0 + 1);
|
||||
} else {
|
||||
// Nothing found
|
||||
result = ValueReconstructResult::Missing;
|
||||
// Indicate to the caller that we need remote_layer replaced with a downloaded
|
||||
// layer in the layer map. The control flow could be a lot simpler, but the point
|
||||
// of this commit is to prepare this function to
|
||||
// 1. become async
|
||||
// 2. do the download right here, using
|
||||
// ```
|
||||
// download_remote_layer().await?;
|
||||
// continue 'layer_map_search;
|
||||
// ```
|
||||
// For (2), current rustc requires that the layers lock guard is not in scope.
|
||||
// Hence, the complicated control flow.
|
||||
let remote_layer_as_persistent: Arc<dyn PersistentLayer> =
|
||||
Arc::clone(&remote_layer) as Arc<dyn PersistentLayer>;
|
||||
info!(
|
||||
"need remote layer {}",
|
||||
remote_layer_as_persistent.traversal_id()
|
||||
);
|
||||
return PageReconstructResult::NeedsDownload(
|
||||
Weak::clone(&timeline.myself),
|
||||
Arc::downgrade(&remote_layer),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -3358,22 +3407,25 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
type TraversalPathItem = (
|
||||
ValueReconstructResult,
|
||||
Lsn,
|
||||
Box<dyn FnOnce() -> TraversalId>,
|
||||
);
|
||||
|
||||
/// Helper function for get_reconstruct_data() to add the path of layers traversed
|
||||
/// to an error, as anyhow context information.
|
||||
fn layer_traversal_error(
|
||||
msg: String,
|
||||
path: Vec<(ValueReconstructResult, Lsn, Box<dyn TraversalLayerExt>)>,
|
||||
) -> PageReconstructResult<()> {
|
||||
fn layer_traversal_error(msg: String, path: Vec<TraversalPathItem>) -> PageReconstructResult<()> {
|
||||
// We want the original 'msg' to be the outermost context. The outermost context
|
||||
// is the most high-level information, which also gets propagated to the client.
|
||||
let mut msg_iter = path
|
||||
.iter()
|
||||
.into_iter()
|
||||
.map(|(r, c, l)| {
|
||||
format!(
|
||||
"layer traversal: result {:?}, cont_lsn {}, layer: {}",
|
||||
r,
|
||||
c,
|
||||
l.traversal_id(),
|
||||
l(),
|
||||
)
|
||||
})
|
||||
.chain(std::iter::once(msg));
|
||||
|
||||
Reference in New Issue
Block a user