Refactor LayerMap::search to also search open and frozen layers.

This allows refactoring in get_reconstruct_data() which is needed by
PR #3228. That PR turns some functions async, and you cannot hold a
RwLock over 'await'. And it's enough to "drop(guard)" the lock guard,
it has to actually go out-of-scope to placate the compiler. This
refactoring allows dropping the lock on 'layers' at end of scope.
This commit is contained in:
Heikki Linnakangas
2023-01-03 22:59:18 +02:00
parent 70a0cce609
commit bba997ee00
3 changed files with 130 additions and 86 deletions

View File

@@ -26,7 +26,7 @@ use std::sync::Arc;
use tracing::*;
use utils::lsn::Lsn;
use super::storage_layer::{InMemoryLayer, Layer};
use super::storage_layer::{InMemoryLayer, InMemoryOrHistoricLayer, Layer};
///
/// LayerMap tracks what layers exist on a timeline.
@@ -241,7 +241,8 @@ where
/// Return value of LayerMap::search
pub struct SearchResult<L: ?Sized> {
pub layer: Arc<L>,
// FIXME: I wish this could be Arc<dyn Layer>. But I couldn't make that work.
pub layer: InMemoryOrHistoricLayer<L>,
pub lsn_floor: Lsn,
}
@@ -261,6 +262,30 @@ where
/// layer.
///
pub fn search(&self, key: Key, end_lsn: Lsn) -> Option<SearchResult<L>> {
// First check if an open or frozen layer matches
if let Some(open_layer) = &self.open_layer {
let start_lsn = open_layer.get_lsn_range().start;
if end_lsn > start_lsn {
return Some(SearchResult {
layer: InMemoryOrHistoricLayer::InMemory(Arc::clone(open_layer)),
lsn_floor: start_lsn,
});
}
}
for frozen_layer in self.frozen_layers.iter().rev() {
let start_lsn = frozen_layer.get_lsn_range().start;
if end_lsn > start_lsn {
return Some(SearchResult {
layer: InMemoryOrHistoricLayer::InMemory(Arc::clone(frozen_layer)),
lsn_floor: start_lsn,
});
}
}
self.search_historic(key, end_lsn)
}
fn search_historic(&self, key: Key, end_lsn: Lsn) -> Option<SearchResult<L>> {
// linear search
// Find the latest image layer that covers the given key
let mut latest_img: Option<Arc<L>> = None;
@@ -286,7 +311,7 @@ where
if Lsn(img_lsn.0 + 1) == end_lsn {
// found exact match
return Some(SearchResult {
layer: Arc::clone(l),
layer: InMemoryOrHistoricLayer::Historic(Arc::clone(l)),
lsn_floor: img_lsn,
});
}
@@ -349,13 +374,13 @@ where
);
Some(SearchResult {
lsn_floor,
layer: l,
layer: InMemoryOrHistoricLayer::Historic(l),
})
} else if let Some(l) = latest_img {
trace!("found img layer and no deltas for request on {key} at {end_lsn}");
Some(SearchResult {
lsn_floor: latest_img_lsn.unwrap(),
layer: l,
layer: InMemoryOrHistoricLayer::Historic(l),
})
} else {
trace!("no layer found for request on {key} at {end_lsn}");

View File

@@ -196,3 +196,38 @@ pub fn downcast_remote_layer(
None
}
}
pub enum InMemoryOrHistoricLayer<L: ?Sized> {
InMemory(Arc<InMemoryLayer>),
Historic(Arc<L>),
}
impl<L: ?Sized> InMemoryOrHistoricLayer<L>
where
L: PersistentLayer,
{
pub fn downcast_remote_layer(&self) -> Option<std::sync::Arc<RemoteLayer>> {
match self {
Self::InMemory(_) => None,
Self::Historic(l) => {
if l.is_remote_layer() {
Arc::clone(l).downcast_remote_layer()
} else {
None
}
}
}
}
pub fn get_value_reconstruct_data(
&self,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_data: &mut ValueReconstructState,
) -> Result<ValueReconstructResult> {
match self {
Self::InMemory(l) => l.get_value_reconstruct_data(key, lsn_range, reconstruct_data),
Self::Historic(l) => l.get_value_reconstruct_data(key, lsn_range, reconstruct_data),
}
}
}

View File

@@ -25,8 +25,8 @@ use std::time::{Duration, Instant, SystemTime};
use crate::tenant::remote_timeline_client::{self, index::LayerFileMetadata};
use crate::tenant::storage_layer::{
DeltaFileName, DeltaLayerWriter, ImageFileName, ImageLayerWriter, InMemoryLayer, LayerFileName,
RemoteLayer,
DeltaFileName, DeltaLayerWriter, ImageFileName, ImageLayerWriter, InMemoryLayer,
InMemoryOrHistoricLayer, LayerFileName, RemoteLayer,
};
use crate::tenant::{
ephemeral_file::is_ephemeral_file,
@@ -1591,7 +1591,7 @@ trait TraversalLayerExt {
fn traversal_id(&self) -> TraversalId;
}
impl TraversalLayerExt for Arc<dyn PersistentLayer> {
impl<T: PersistentLayer + ?Sized> TraversalLayerExt for T {
fn traversal_id(&self) -> TraversalId {
match self.local_path() {
Some(local_path) => {
@@ -1621,6 +1621,15 @@ impl TraversalLayerExt for Arc<InMemoryLayer> {
}
}
impl TraversalLayerExt for InMemoryOrHistoricLayer<dyn PersistentLayer> {
fn traversal_id(&self) -> String {
match self {
Self::InMemory(l) => l.traversal_id(),
Self::Historic(l) => l.traversal_id(),
}
}
}
impl Timeline {
///
/// Get a handle to a Layer for reading.
@@ -1642,8 +1651,11 @@ impl Timeline {
// For debugging purposes, collect the path of layers that we traversed
// through. It's included in the error message if we fail to find the key.
let mut traversal_path =
Vec::<(ValueReconstructResult, Lsn, Box<dyn TraversalLayerExt>)>::new();
let mut traversal_path = Vec::<(
ValueReconstructResult,
Lsn,
Box<dyn TraversalLayerExt>,
)>::new();
let cached_lsn = if let Some((cached_lsn, _)) = &reconstruct_state.img {
*cached_lsn
@@ -1679,7 +1691,7 @@ impl Timeline {
Lsn(cont_lsn.0 - 1),
request_lsn,
timeline.ancestor_lsn
), traversal_path);
), &traversal_path);
}
prev_lsn = cont_lsn;
}
@@ -1689,7 +1701,7 @@ impl Timeline {
"could not find data for key {} at LSN {}, for request at LSN {}",
key, cont_lsn, request_lsn
),
traversal_path,
&traversal_path,
);
}
}
@@ -1708,82 +1720,54 @@ impl Timeline {
timeline_owned = ancestor;
timeline = &*timeline_owned;
prev_lsn = Lsn(u64::MAX);
continue;
continue 'outer;
}
let layers = timeline.layers.read().unwrap();
// Check the open and frozen in-memory layers first, in order from newest
// to oldest.
if let Some(open_layer) = &layers.open_layer {
let start_lsn = open_layer.get_lsn_range().start;
if cont_lsn > start_lsn {
//info!("CHECKING for {} at {} on open layer {}", key, cont_lsn, open_layer.filename().display());
// Get all the data needed to reconstruct the page version from this layer.
// But if we have an older cached page image, no need to go past that.
let lsn_floor = max(cached_lsn + 1, start_lsn);
result = match open_layer.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
) {
Ok(result) => result,
Err(e) => return PageReconstructResult::from(e),
};
cont_lsn = lsn_floor;
traversal_path.push((result, cont_lsn, Box::new(open_layer.clone())));
continue;
}
}
for frozen_layer in layers.frozen_layers.iter().rev() {
let start_lsn = frozen_layer.get_lsn_range().start;
if cont_lsn > start_lsn {
//info!("CHECKING for {} at {} on frozen layer {}", key, cont_lsn, frozen_layer.filename().display());
let lsn_floor = max(cached_lsn + 1, start_lsn);
result = match frozen_layer.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
) {
Ok(result) => result,
Err(e) => return PageReconstructResult::from(e),
};
cont_lsn = lsn_floor;
traversal_path.push((result, cont_lsn, Box::new(frozen_layer.clone())));
continue 'outer;
}
}
if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) {
//info!("CHECKING for {} at {} on historic layer {}", key, cont_lsn, layer.filename().display());
// If it's a remote layer, the caller can do the download and retry.
if let Some(remote_layer) = super::storage_layer::downcast_remote_layer(&layer) {
info!("need remote layer {}", layer.traversal_id());
return PageReconstructResult::NeedsDownload(
Weak::clone(&timeline.myself),
Arc::downgrade(&remote_layer),
);
}
let lsn_floor = max(cached_lsn + 1, lsn_floor);
result = match layer.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
) {
Ok(result) => result,
Err(e) => return PageReconstructResult::from(e),
loop {
let remote_layer = {
let layers = timeline.layers.read().unwrap();
if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) {
// If it's a remote layer, download it and retry.
if let Some(remote_layer) = layer.downcast_remote_layer() {
// TODO: push a breadcrumb to 'traversal_path' to record the fact that
// we downloaded / would need to download this.
remote_layer
} else {
// Get all the data needed to reconstruct the page version from this layer.
// But if we have an older cached page image, no need to go past that.
let lsn_floor = max(cached_lsn + 1, lsn_floor);
result = match layer.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
) {
Ok(result) => result,
Err(e) => return PageReconstructResult::from(e),
};
cont_lsn = lsn_floor;
traversal_path.push((result, cont_lsn, Box::new(layer)));
continue 'outer;
}
} else if timeline.ancestor_timeline.is_some() {
// Nothing on this timeline. Traverse to parent
result = ValueReconstructResult::Continue;
cont_lsn = Lsn(timeline.ancestor_lsn.0 + 1);
continue 'outer;
} else {
// Nothing found
result = ValueReconstructResult::Missing;
continue 'outer;
}
};
cont_lsn = lsn_floor;
traversal_path.push((result, cont_lsn, Box::new(layer.clone())));
} else if timeline.ancestor_timeline.is_some() {
// Nothing on this timeline. Traverse to parent
result = ValueReconstructResult::Continue;
cont_lsn = Lsn(timeline.ancestor_lsn.0 + 1);
} else {
// Nothing found
result = ValueReconstructResult::Missing;
// The next layer doesn't exist locally. The caller can do the download and retry.
// (The control flow is a bit complicated here because we must drop the 'layers'
// lock before awaiting on the Future.)
info!("need remote layer {}", remote_layer.traversal_id());
return PageReconstructResult::NeedsDownload(
Weak::clone(&timeline.myself),
Arc::downgrade(&remote_layer),
);
}
}
}
@@ -3362,7 +3346,7 @@ where
/// to an error, as anyhow context information.
fn layer_traversal_error(
msg: String,
path: Vec<(ValueReconstructResult, Lsn, Box<dyn TraversalLayerExt>)>,
path: &[(ValueReconstructResult, Lsn, Box<dyn TraversalLayerExt>)],
) -> PageReconstructResult<()> {
// We want the original 'msg' to be the outermost context. The outermost context
// is the most high-level information, which also gets propagated to the client.