This commit is contained in:
Bojan Serafimov
2023-01-12 19:33:54 -05:00
parent 2fcbb46338
commit 5dc99f1b04
4 changed files with 49 additions and 77 deletions

View File

@@ -166,7 +166,7 @@ where
);
if layer.get_key_range() == (Key::MIN..Key::MAX) {
self.l0_delta_layers.push(layer.clone());
self.l0_delta_layers.push(layer);
}
NUM_ONDISK_LAYERS.inc();
@@ -243,7 +243,7 @@ where
}
}
return Ok(true);
Ok(true)
}
pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<L>> {
@@ -275,14 +275,14 @@ where
// Initialize loop variables
let mut coverage: Vec<(Range<Key>, Option<Arc<L>>)> = vec![];
let mut current_key = start.clone();
let mut current_key = start;
let mut current_val = version.image_coverage.query(start);
// Loop through the change events and push intervals
for (change_key, change_val) in version.image_coverage.range(start..end) {
let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
coverage.push((kr, current_val.take()));
current_key = change_key.clone();
current_key = change_key;
current_val = change_val.clone();
}
@@ -290,7 +290,7 @@ where
let kr = Key::from_i128(current_key)..Key::from_i128(end);
coverage.push((kr, current_val.take()));
return Ok(coverage);
Ok(coverage)
}
/// Count the height of the tallest stack of deltas in this 2d region.
@@ -301,7 +301,11 @@ where
/// we'll need to visit for any page reconstruction in this region.
/// We use this heuristic to decide whether to create an image layer.
pub fn count_deltas(
&self, key: &Range<Key>, lsn: &Range<Lsn>, limit: Option<usize>) -> Result<usize> {
&self,
key: &Range<Key>,
lsn: &Range<Lsn>,
limit: Option<usize>,
) -> Result<usize> {
// We get the delta coverage of the region, and for each part of the coverage
// we recurse right underneath the delta. The recursion depth is limited by
// the largest result this function could return, which is in practice between
@@ -321,7 +325,7 @@ where
// Initialize loop variables
let mut max_stacked_deltas = 0;
let mut current_key = start.clone();
let mut current_key = start;
let mut current_val = version.delta_coverage.query(start);
// Loop through the delta coverage and recurse on each part
@@ -333,14 +337,15 @@ where
let lr = lsn.start..val.get_lsn_range().start;
if !kr.is_empty() {
let new_limit = limit.map(|l| l - 1);
let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit)?;
let max_stacked_deltas_underneath =
self.count_deltas(&kr, &lr, new_limit)?;
max_stacked_deltas =
std::cmp::max(max_stacked_deltas, 1 + max_stacked_deltas_underneath);
}
}
}
current_key = change_key.clone();
current_key = change_key;
current_val = change_val.clone();
}
@@ -450,7 +455,11 @@ where
/// TODO actually use this method instead of count_deltas. Currently we only use
/// it for benchmarks.
pub fn get_difficulty_map(
&self, lsn: Lsn, partitioning: &KeyPartitioning, limit: Option<usize>) -> Vec<usize> {
&self,
lsn: Lsn,
partitioning: &KeyPartitioning,
limit: Option<usize>,
) -> Vec<usize> {
// TODO This is a naive implementation. Perf improvements to do:
// 1. Instead of calling self.image_coverage and self.count_deltas,
// iterate the image and delta coverage only once.
@@ -460,12 +469,16 @@ where
.map(|part| {
let mut difficulty = 0;
for range in &part.ranges {
if limit == Some(difficulty) {break;}
if limit == Some(difficulty) {
break;
}
for (img_range, last_img) in self
.image_coverage(range, lsn)
.expect("why would this err?")
{
if limit == Some(difficulty) {break;}
if limit == Some(difficulty) {
break;
}
let img_lsn = if let Some(last_img) = last_img {
last_img.get_lsn_range().end
} else {
@@ -513,19 +526,3 @@ where
Ok(())
}
}
#[cfg(test)]
mod tests {
use crate::tenant::storage_layer::LayerDescriptor;
use super::*;
#[test]
fn test_count_deltas() {
let map = LayerMap::<LayerDescriptor>::default();
// TODO implement
}
// TODO add more
}

View File

@@ -34,13 +34,7 @@ impl<Value: Clone> HistoricLayerCoverage<Value> {
///
/// Panics if new layer has older lsn.start than an existing layer.
/// See BufferedHistoricLayerCoverage for a more general insertion method.
pub fn insert(
self: &mut Self,
key: Range<i128>,
lsn: Range<u64>,
value: Value,
is_image: bool,
) {
pub fn insert(&mut self, key: Range<i128>, lsn: Range<u64>, value: Value, is_image: bool) {
// It's only a persistent map, not a retroactive one
if let Some(last_entry) = self.historic.iter().rev().next() {
let last_lsn = last_entry.0;
@@ -56,9 +50,7 @@ impl<Value: Clone> HistoricLayerCoverage<Value> {
if is_image {
self.head.image_coverage.insert(key, lsn.clone(), value);
} else {
self.head
.delta_coverage
.insert(key.clone(), lsn.clone(), value);
self.head.delta_coverage.insert(key, lsn.clone(), value);
}
// Remember history. Clone is O(1)
@@ -66,7 +58,7 @@ impl<Value: Clone> HistoricLayerCoverage<Value> {
}
/// Query at a particular LSN, inclusive
pub fn get_version(self: &Self, lsn: u64) -> Option<&LayerCoverageTuple<Value>> {
pub fn get_version(&self, lsn: u64) -> Option<&LayerCoverageTuple<Value>> {
match self.historic.range(..=lsn).rev().next() {
Some((_, v)) => Some(v),
None => None,
@@ -74,7 +66,7 @@ impl<Value: Clone> HistoricLayerCoverage<Value> {
}
/// Remove all entries after a certain LSN
pub fn trim(self: &mut Self, begin: &u64) {
pub fn trim(&mut self, begin: &u64) {
self.historic.split_off(begin);
self.head = self
.historic
@@ -289,28 +281,22 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
}
}
pub fn insert(
self: &mut Self,
key: Range<i128>,
lsn: Range<u64>,
value: Value,
is_image: bool,
) {
pub fn insert(&mut self, key: Range<i128>, lsn: Range<u64>, value: Value, is_image: bool) {
self.buffer.insert(
(lsn.start, lsn.end, key.start, key.end, is_image),
Some(value.clone()),
Some(value),
);
}
pub fn remove(self: &mut Self, key: Range<i128>, lsn: Range<u64>, is_image: bool) {
pub fn remove(&mut self, key: Range<i128>, lsn: Range<u64>, is_image: bool) {
self.buffer
.insert((lsn.start, lsn.end, key.start, key.end, is_image), None);
}
pub fn rebuild(self: &mut Self) {
pub fn rebuild(&mut self) {
// Find the first LSN that needs to be rebuilt
let rebuild_since: u64 = match self.buffer.iter().next() {
Some(((lsn_start, _, _, _, _), _)) => lsn_start.clone(),
Some(((lsn_start, _, _, _, _), _)) => *lsn_start,
None => return, // No need to rebuild if buffer is empty
};
@@ -318,7 +304,7 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
self.buffer.retain(|rect, layer| {
match layer {
Some(l) => {
let existing = self.layers.insert(rect.clone(), l.clone());
let existing = self.layers.insert(*rect, l.clone());
if existing.is_some() {
// TODO this happened once. Investigate.
// panic!("can't overwrite layer");
@@ -349,7 +335,7 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
}
/// Iterate all the layers
pub fn iter(self: &Self) -> impl '_ + Iterator<Item = Value> {
pub fn iter(&self) -> impl '_ + Iterator<Item = Value> {
// NOTE we can actually perform this without rebuilding,
// but it's not necessary for now.
if !self.buffer.is_empty() {
@@ -361,7 +347,7 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
/// Return a reference to a queryable map, assuming all updates
/// have already been processed using self.rebuild()
pub fn get(self: &Self) -> anyhow::Result<&HistoricLayerCoverage<Value>> {
pub fn get(&self) -> anyhow::Result<&HistoricLayerCoverage<Value>> {
// NOTE we error here instead of implicitly rebuilding because
// rebuilding is somewhat expensive.
// TODO maybe implicitly rebuild and log/sentry an error?

View File

@@ -42,7 +42,7 @@ impl<Value: Clone> LayerCoverage<Value> {
/// Helper function to subdivide the key range without changing any values
///
/// Complexity: O(log N)
fn add_node(self: &mut Self, key: i128) {
fn add_node(&mut self, key: i128) {
let value = match self.nodes.range(..=key).last() {
Some((_, Some(v))) => Some(v.clone()),
Some((_, None)) => None,
@@ -54,7 +54,7 @@ impl<Value: Clone> LayerCoverage<Value> {
/// Insert a layer.
///
/// Complexity: worst case O(N), in practice O(log N). See not in implementation.
pub fn insert(self: &mut Self, key: Range<i128>, lsn: Range<u64>, value: Value) {
pub fn insert(&mut self, key: Range<i128>, lsn: Range<u64>, value: Value) {
// NOTE The order of the following lines is important!!
// Add nodes at endpoints
@@ -78,8 +78,8 @@ impl<Value: Clone> LayerCoverage<Value> {
};
if needs_cover {
match prev_covered {
true => to_remove.push(k.clone()),
false => to_update.push(k.clone()),
true => to_remove.push(*k),
false => to_update.push(*k),
}
}
prev_covered = needs_cover;
@@ -88,8 +88,7 @@ impl<Value: Clone> LayerCoverage<Value> {
to_remove.push(key.end);
}
for k in to_update {
self.nodes
.insert_mut(k.clone(), Some((lsn.end.clone(), value.clone())));
self.nodes.insert_mut(k, Some((lsn.end, value.clone())));
}
for k in to_remove {
self.nodes.remove_mut(&k);
@@ -99,7 +98,7 @@ impl<Value: Clone> LayerCoverage<Value> {
/// Get the latest (by lsn.end) layer at a given key
///
/// Complexity: O(log N)
pub fn query(self: &Self, key: i128) -> Option<Value> {
pub fn query(&self, key: i128) -> Option<Value> {
self.nodes
.range(..=key)
.rev()
@@ -113,24 +112,14 @@ impl<Value: Clone> LayerCoverage<Value> {
/// want to start with self.query(key.start), and then follow up with self.range
///
/// Complexity: O(log N + result_size)
pub fn range(
self: &Self,
key: Range<i128>,
) -> impl '_ + Iterator<Item = (i128, Option<Value>)> {
pub fn range(&self, key: Range<i128>) -> impl '_ + Iterator<Item = (i128, Option<Value>)> {
self.nodes
.range(key)
.map(|(k, v)| (k.clone(), v.as_ref().map(|x| x.1.clone())))
}
/// Like range, but covers the entire key space
pub fn iter(self: &Self) -> impl '_ + Iterator<Item = (i128, Option<Value>)> {
self.nodes
.iter()
.map(|(k, v)| (k.clone(), v.as_ref().map(|x| x.1.clone())))
.map(|(k, v)| (*k, v.as_ref().map(|x| x.1.clone())))
}
/// O(1) clone
pub fn clone(self: &Self) -> Self {
pub fn clone(&self) -> Self {
Self {
nodes: self.nodes.clone(),
}
@@ -153,7 +142,7 @@ impl<T: Clone> Default for LayerCoverageTuple<T> {
}
impl<Value: Clone> LayerCoverageTuple<Value> {
pub fn clone(self: &Self) -> Self {
pub fn clone(&self) -> Self {
Self {
image_coverage: self.image_coverage.clone(),
delta_coverage: self.delta_coverage.clone(),

View File

@@ -2264,8 +2264,8 @@ impl Timeline {
// after we read last_record_lsn, which is passed here in the 'lsn' argument.
if img_lsn < lsn {
let threshold = self.get_image_creation_threshold();
let num_deltas = layers.count_deltas(
&img_range, &(img_lsn..lsn), Some(threshold))?;
let num_deltas =
layers.count_deltas(&img_range, &(img_lsn..lsn), Some(threshold))?;
debug!(
"key range {}-{}, has {} deltas on this timeline in LSN range {}..{}",