mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 09:22:55 +00:00
@@ -247,6 +247,29 @@ impl LayerMap {
|
||||
/// 'open' and 'frozen' layers!
|
||||
///
|
||||
pub fn search(&self, key: Key, end_lsn: Lsn) -> Option<SearchResult> {
|
||||
self.search_incremental_inner(key, end_lsn).map(|(x, _)| x)
|
||||
}
|
||||
|
||||
pub fn search_incremental(
|
||||
&self,
|
||||
key: Key,
|
||||
end_lsn: Lsn,
|
||||
force_incremental: bool,
|
||||
) -> Option<(SearchResult, Option<SearchResult>)> {
|
||||
self.search_incremental_inner(key, end_lsn).map(|(x, y)| {
|
||||
if force_incremental {
|
||||
(y.unwrap(), None)
|
||||
} else {
|
||||
(x, y)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub fn search_incremental_inner(
|
||||
&self,
|
||||
key: Key,
|
||||
end_lsn: Lsn,
|
||||
) -> Option<(SearchResult, Option<SearchResult>)> {
|
||||
let version = self.historic.get().unwrap().get_version(end_lsn.0 - 1)?;
|
||||
let latest_delta = version.delta_coverage.query(key.to_i128());
|
||||
let latest_image = version.image_coverage.query(key.to_i128());
|
||||
@@ -255,34 +278,59 @@ impl LayerMap {
|
||||
(None, None) => None,
|
||||
(None, Some(image)) => {
|
||||
let lsn_floor = image.get_lsn_range().start;
|
||||
Some(SearchResult {
|
||||
layer: image,
|
||||
lsn_floor,
|
||||
})
|
||||
Some((
|
||||
SearchResult {
|
||||
layer: image,
|
||||
lsn_floor,
|
||||
},
|
||||
None,
|
||||
))
|
||||
}
|
||||
(Some(delta), None) => {
|
||||
let lsn_floor = delta.get_lsn_range().start;
|
||||
Some(SearchResult {
|
||||
layer: delta,
|
||||
lsn_floor,
|
||||
})
|
||||
Some((
|
||||
SearchResult {
|
||||
layer: delta,
|
||||
lsn_floor,
|
||||
},
|
||||
None,
|
||||
))
|
||||
}
|
||||
(Some(delta), Some(image)) => {
|
||||
let img_lsn = image.get_lsn_range().start;
|
||||
let image_is_newer = image.get_lsn_range().end >= delta.get_lsn_range().end;
|
||||
let image_exact_match = img_lsn + 1 == end_lsn;
|
||||
if image_is_newer || image_exact_match {
|
||||
Some(SearchResult {
|
||||
layer: image,
|
||||
lsn_floor: img_lsn,
|
||||
})
|
||||
if image_is_newer {
|
||||
Some((
|
||||
SearchResult {
|
||||
layer: image,
|
||||
lsn_floor: img_lsn,
|
||||
},
|
||||
None,
|
||||
))
|
||||
} else {
|
||||
let lsn_floor =
|
||||
std::cmp::max(delta.get_lsn_range().start, image.get_lsn_range().start + 1);
|
||||
Some(SearchResult {
|
||||
layer: delta,
|
||||
lsn_floor,
|
||||
})
|
||||
if image_exact_match {
|
||||
Some((
|
||||
SearchResult {
|
||||
layer: image,
|
||||
lsn_floor: img_lsn,
|
||||
},
|
||||
Some(SearchResult {
|
||||
layer: delta,
|
||||
lsn_floor,
|
||||
}),
|
||||
))
|
||||
} else {
|
||||
Some((
|
||||
SearchResult {
|
||||
layer: delta,
|
||||
lsn_floor,
|
||||
},
|
||||
None,
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2424,7 +2424,15 @@ impl Timeline {
|
||||
let mut result = ValueReconstructResult::Continue;
|
||||
let mut cont_lsn = Lsn(request_lsn.0 + 1);
|
||||
|
||||
let mut search_incremental = false;
|
||||
|
||||
'outer: loop {
|
||||
let this_round_search_incremental = if search_incremental {
|
||||
search_incremental = false;
|
||||
true
|
||||
} else {
|
||||
false
|
||||
};
|
||||
// The function should have updated 'state'
|
||||
//info!("CALLED for {} at {}: {:?} with {} records, cached {}", key, cont_lsn, result, reconstruct_state.records.len(), cached_lsn);
|
||||
match result {
|
||||
@@ -2547,6 +2555,7 @@ impl Timeline {
|
||||
Ok(result) => result,
|
||||
Err(e) => return Err(PageReconstructError::from(e)),
|
||||
};
|
||||
assert!(!this_round_search_incremental);
|
||||
cont_lsn = lsn_floor;
|
||||
// metrics: open_layer does not count as fs access, so we are not updating `read_count`
|
||||
traversal_path.push((
|
||||
@@ -2560,6 +2569,7 @@ impl Timeline {
|
||||
continue 'outer;
|
||||
}
|
||||
}
|
||||
|
||||
for frozen_layer in layers.frozen_layers.iter().rev() {
|
||||
let start_lsn = frozen_layer.get_lsn_range().start;
|
||||
if cont_lsn > start_lsn {
|
||||
@@ -2574,6 +2584,7 @@ impl Timeline {
|
||||
Ok(result) => result,
|
||||
Err(e) => return Err(PageReconstructError::from(e)),
|
||||
};
|
||||
assert!(!this_round_search_incremental);
|
||||
cont_lsn = lsn_floor;
|
||||
// metrics: open_layer does not count as fs access, so we are not updating `read_count`
|
||||
traversal_path.push((
|
||||
@@ -2588,7 +2599,9 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) {
|
||||
if let Some((SearchResult { lsn_floor, layer }, next)) =
|
||||
layers.search_incremental(key, cont_lsn, this_round_search_incremental)
|
||||
{
|
||||
let layer = timeline.lcache.get_from_desc(&layer);
|
||||
// If it's a remote layer, download it and retry.
|
||||
if let Some(remote_layer) =
|
||||
@@ -2610,6 +2623,17 @@ impl Timeline {
|
||||
Ok(result) => result,
|
||||
Err(e) => return Err(PageReconstructError::from(e)),
|
||||
};
|
||||
|
||||
if !layer.layer_desc().is_delta
|
||||
&& matches!(result, ValueReconstructResult::Continue)
|
||||
{
|
||||
// if is incremental image layer and not found, try again with delta layer
|
||||
if next.is_some() {
|
||||
search_incremental = true;
|
||||
continue 'outer;
|
||||
}
|
||||
};
|
||||
|
||||
cont_lsn = lsn_floor;
|
||||
*read_count += 1;
|
||||
traversal_path.push((
|
||||
@@ -4071,7 +4095,7 @@ impl Timeline {
|
||||
let mut construct_image_for_key = false;
|
||||
let image_lsn = Lsn(lsn_range.end.0 - 1);
|
||||
|
||||
const PAGE_MATERIALIZE_THRESHOLD: usize = 2000;
|
||||
const PAGE_MATERIALIZE_THRESHOLD: usize = 40;
|
||||
|
||||
for x in all_values_iter {
|
||||
let (key, lsn, value) = x?;
|
||||
|
||||
Reference in New Issue
Block a user