Compare commits

...

2 Commits

Author SHA1 Message Date
Bojan Serafimov
c471c25744 Clone less 2023-02-06 14:42:17 -05:00
Bojan Serafimov
e030830397 WIP 2023-02-06 13:55:53 -05:00
4 changed files with 72 additions and 277 deletions

View File

@@ -53,6 +53,7 @@ use crate::repository::Key;
use crate::tenant::storage_layer::InMemoryLayer;
use crate::tenant::storage_layer::Layer;
use anyhow::Result;
use std::collections::HashMap;
use std::collections::VecDeque;
use std::ops::Range;
use std::sync::Arc;
@@ -61,6 +62,8 @@ use utils::lsn::Lsn;
use historic_layer_coverage::BufferedHistoricLayerCoverage;
pub use historic_layer_coverage::Replacement;
use self::historic_layer_coverage::LayerKey;
use super::storage_layer::range_eq;
///
@@ -87,11 +90,18 @@ pub struct LayerMap<L: ?Sized> {
pub frozen_layers: VecDeque<Arc<InMemoryLayer>>,
/// Index of the historic layers optimized for search
historic: BufferedHistoricLayerCoverage<Arc<L>>,
historic: BufferedHistoricLayerCoverage<LayerKey>,
/// All layers accessible by key. Useful for:
/// 1. Iterating all layers
/// 2. Dereferencing a self.historic search result
/// 3. Replacing a layer with a remote/local version without
/// rebuilding the self.historic index.
mapping: HashMap<LayerKey, Arc<L>>,
/// L0 layers have key range Key::MIN..Key::MAX, and locating them using R-Tree search is very inefficient.
/// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree.
l0_delta_layers: Vec<Arc<L>>,
l0_delta_layers: HashMap<LayerKey, Arc<L>>,
}
impl<L: ?Sized> Default for LayerMap<L> {
@@ -100,8 +110,9 @@ impl<L: ?Sized> Default for LayerMap<L> {
open_layer: None,
next_open_layer_at: None,
frozen_layers: VecDeque::default(),
l0_delta_layers: Vec::default(),
l0_delta_layers: HashMap::default(),
historic: BufferedHistoricLayerCoverage::default(),
mapping: HashMap::default(),
}
}
}
@@ -139,24 +150,6 @@ where
self.layer_map.remove_historic_noflush(layer)
}
/// Replaces existing layer iff it is the `expected`.
///
/// If the expected layer has been removed it will not be inserted by this function.
///
/// Returned `Replacement` describes succeeding in replacement or the reason why it could not
/// be done.
///
/// TODO replacement can be done without buffering and rebuilding layer map updates.
/// One way to do that is to add a layer of indirection for returned values, so
/// that we can replace values only by updating a hashmap.
pub fn replace_historic(
&mut self,
expected: &Arc<L>,
new: Arc<L>,
) -> anyhow::Result<Replacement<Arc<L>>> {
self.layer_map.replace_historic_noflush(expected, new)
}
// We will flush on drop anyway, but this method makes it
// more explicit that there is some work being done.
/// Apply all updates
@@ -228,33 +221,38 @@ where
match (latest_delta, latest_image) {
(None, None) => None,
(None, Some(image)) => {
let image = self.mapping.get(&image).unwrap();
let lsn_floor = image.get_lsn_range().start;
Some(SearchResult {
layer: image,
layer: image.clone(),
lsn_floor,
})
}
(Some(delta), None) => {
let delta = self.mapping.get(&delta).unwrap();
let lsn_floor = delta.get_lsn_range().start;
Some(SearchResult {
layer: delta,
layer: delta.clone(),
lsn_floor,
})
}
(Some(delta), Some(image)) => {
let image = self.mapping.get(&image).unwrap();
let delta = self.mapping.get(&delta).unwrap();
let img_lsn = image.get_lsn_range().start;
let image_is_newer = image.get_lsn_range().end >= delta.get_lsn_range().end;
let image_exact_match = img_lsn + 1 == end_lsn;
if image_is_newer || image_exact_match {
Some(SearchResult {
layer: image,
layer: image.clone(),
lsn_floor: img_lsn,
})
} else {
let lsn_floor =
std::cmp::max(delta.get_lsn_range().start, image.get_lsn_range().start + 1);
Some(SearchResult {
layer: delta,
layer: delta.clone(),
lsn_floor,
})
}
@@ -273,13 +271,12 @@ where
/// Helper function for BatchedUpdates::insert_historic
///
pub(self) fn insert_historic_noflush(&mut self, layer: Arc<L>) {
self.historic.insert(
historic_layer_coverage::LayerKey::from(&*layer),
Arc::clone(&layer),
);
let key = LayerKey::from(&*layer);
self.historic.insert(key.clone(), key.clone());
self.mapping.insert(key.clone(), layer.clone());
if Self::is_l0(&layer) {
self.l0_delta_layers.push(layer);
self.l0_delta_layers.insert(key, layer.clone());
}
NUM_ONDISK_LAYERS.inc();
@@ -291,27 +288,28 @@ where
/// Helper function for BatchedUpdates::remove_historic
///
pub fn remove_historic_noflush(&mut self, layer: Arc<L>) {
self.historic
.remove(historic_layer_coverage::LayerKey::from(&*layer));
let key = historic_layer_coverage::LayerKey::from(&*layer);
self.historic.remove(key.clone());
self.mapping.remove(&key.clone());
if Self::is_l0(&layer) {
let len_before = self.l0_delta_layers.len();
self.l0_delta_layers
.retain(|other| !Self::compare_arced_layers(other, &layer));
// this assertion is related to use of Arc::ptr_eq in Self::compare_arced_layers,
// there's a chance that the comparison fails at runtime due to it comparing (pointer,
// vtable) pairs.
assert_eq!(
self.l0_delta_layers.len(),
len_before - 1,
"failed to locate removed historic layer from l0_delta_layers"
);
self.l0_delta_layers.remove(&key);
}
NUM_ONDISK_LAYERS.dec();
}
pub(self) fn replace_historic_noflush(
/// Replaces existing layer iff it is the `expected`.
///
/// If the expected layer has been removed it will not be inserted by this function.
///
/// Returned `Replacement` describes succeeding in replacement or the reason why it could not
/// be done.
///
/// TODO replacement can be done without buffering and rebuilding layer map updates.
/// One way to do that is to add a layer of indirection for returned values, so
/// that we can replace values only by updating a hashmap.
pub fn replace_historic(
&mut self,
expected: &Arc<L>,
new: Arc<L>,
@@ -332,29 +330,23 @@ where
"expected and new must both be l0 deltas or neither should be: {expected_l0} != {new_l0}"
);
let l0_index = if expected_l0 {
// find the index in case replace worked, we need to replace that as well
Some(
self.l0_delta_layers
.iter()
.position(|slot| Self::compare_arced_layers(slot, expected))
.ok_or_else(|| anyhow::anyhow!("existing l0 delta layer was not found"))?,
)
} else {
None
use std::collections::hash_map::Entry;
if expected_l0 {
match self.mapping.entry(key.clone()) {
Entry::Occupied(mut entry) => entry.insert(new.clone()),
Entry::Vacant(_) => anyhow::bail!("layer doesn't exist"),
};
};
let replaced = self.historic.replace(&key, new.clone(), |existing| {
Self::compare_arced_layers(existing, expected)
});
match self.mapping.entry(key.clone()) {
Entry::Occupied(mut entry) => entry.insert(new.clone()),
Entry::Vacant(_) => anyhow::bail!("layer doesn't exist"),
};
if let Replacement::Replaced { .. } = &replaced {
if let Some(index) = l0_index {
self.l0_delta_layers[index] = new;
}
}
Ok(replaced)
Ok(Replacement::Replaced {
in_buffered: false,
})
}
/// Helper function for BatchedUpdates::drop.
@@ -382,8 +374,8 @@ where
let start = key.start.to_i128();
let end = key.end.to_i128();
let layer_covers = |layer: Option<Arc<L>>| match layer {
Some(layer) => layer.get_lsn_range().start >= lsn.start,
let layer_covers = |key: Option<&LayerKey>| match key {
Some(key) => self.mapping.get(key).unwrap().get_lsn_range().start >= lsn.start,
None => false,
};
@@ -403,7 +395,7 @@ where
}
pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<L>> {
self.historic.iter()
self.mapping.values().cloned()
}
///
@@ -430,10 +422,13 @@ where
// Initialize loop variables
let mut coverage: Vec<(Range<Key>, Option<Arc<L>>)> = vec![];
let mut current_key = start;
let mut current_val = version.image_coverage.query(start);
let mut current_val = version.image_coverage.query(start)
.map(|key| self.mapping.get(&key).unwrap().clone());
// Loop through the change events and push intervals
for (change_key, change_val) in version.image_coverage.range(start..end) {
let change_val = change_val.map(|key| self.mapping.get(&key).unwrap().clone());
let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
coverage.push((kr, current_val.take()));
current_key = change_key;
@@ -527,6 +522,7 @@ where
for (change_key, change_val) in version.delta_coverage.range(start..end) {
// If there's a relevant delta in this part, add 1 and recurse down
if let Some(val) = current_val {
let val = self.mapping.get(&val).unwrap().clone();
if val.get_lsn_range().end > lsn.start {
let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
let lr = lsn.start..val.get_lsn_range().start;
@@ -549,6 +545,7 @@ where
// Consider the last part
if let Some(val) = current_val {
let val = self.mapping.get(&val).unwrap().clone();
if val.get_lsn_range().end > lsn.start {
let kr = Key::from_i128(current_key)..Key::from_i128(end);
let lr = lsn.start..val.get_lsn_range().start;
@@ -705,7 +702,7 @@ where
/// Return all L0 delta layers
pub fn get_level0_deltas(&self) -> Result<Vec<Arc<L>>> {
Ok(self.l0_delta_layers.clone())
Ok(self.l0_delta_layers.values().cloned().collect())
}
/// debugging function to print out the contents of the layer map
@@ -730,18 +727,6 @@ where
println!("End dump LayerMap");
Ok(())
}
#[inline(always)]
fn compare_arced_layers(left: &Arc<L>, right: &Arc<L>) -> bool {
// FIXME: ptr_eq might fail to return true for 'dyn' references because of multiple vtables
// can be created in compilation. Clippy complains about this. In practice it seems to
// work.
//
// In future rust versions this might become Arc::as_ptr(left) as *const () ==
// Arc::as_ptr(right) as *const (), we could change to that before.
#[allow(clippy::vtable_address_comparisons)]
Arc::ptr_eq(left, right)
}
}
#[cfg(test)]
@@ -802,7 +787,6 @@ mod tests {
assert_eq!(count_layer_in(&map, &remote), expected_in_counts);
let replaced = map
.batch_update()
.replace_historic(&remote, downloaded.clone())
.expect("name derived attributes are the same");
assert!(

View File

@@ -12,7 +12,7 @@ use super::layer_coverage::LayerCoverageTuple;
/// These three values are enough to uniquely identify a layer, since
/// a layer is obligated to contain all contents within range, so two
/// deltas (or images) with the same range have identical content.
#[derive(Debug, PartialEq, Eq, Clone)]
#[derive(Debug, PartialEq, Eq, Clone, Hash)]
pub struct LayerKey {
// TODO I use i128 and u64 because it was easy for prototyping,
// testing, and benchmarking. If we can use the Lsn and Key
@@ -438,46 +438,6 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
///
/// Returns a `Replacement` value describing the outcome; only the case of
/// `Replacement::Replaced` modifies the map and requires a rebuild.
pub fn replace<F>(
&mut self,
layer_key: &LayerKey,
new: Value,
check_expected: F,
) -> Replacement<Value>
where
F: FnOnce(&Value) -> bool,
{
let (slot, in_buffered) = match self.buffer.get(layer_key) {
Some(inner @ Some(_)) => {
// we compare against the buffered version, because there will be a later
// rebuild before querying
(inner.as_ref(), true)
}
Some(None) => {
// buffer has removal for this key; it will not be equivalent by any check_expected.
return Replacement::RemovalBuffered;
}
None => {
// no pending modification for the key, check layers
(self.layers.get(layer_key), false)
}
};
match slot {
Some(existing) if !check_expected(existing) => {
// unfortunate clone here, but otherwise the nll borrowck grows the region of
// 'a to cover the whole function, and we could not mutate in the other
// Some(existing) branch
Replacement::Unexpected(existing.clone())
}
None => Replacement::NotFound,
Some(_existing) => {
self.insert(layer_key.to_owned(), new);
Replacement::Replaced { in_buffered }
}
}
}
pub fn rebuild(&mut self) {
// Find the first LSN that needs to be rebuilt
let rebuild_since: u64 = match self.buffer.iter().next() {
@@ -521,17 +481,6 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
)
}
/// Iterate all the layers
pub fn iter(&self) -> impl '_ + Iterator<Item = Value> {
// NOTE we can actually perform this without rebuilding,
// but it's not necessary for now.
if !self.buffer.is_empty() {
panic!("rebuild pls")
}
self.layers.values().cloned()
}
/// Return a reference to a queryable map, assuming all updates
/// have already been processed using self.rebuild()
pub fn get(&self) -> anyhow::Result<&HistoricLayerCoverage<Value>> {
@@ -670,139 +619,3 @@ fn test_retroactive_simple() {
assert_eq!(version.image_coverage.query(8), Some("Image 4".to_string()));
}
}
#[test]
fn test_retroactive_replacement() {
let mut map = BufferedHistoricLayerCoverage::new();
let keys = [
LayerKey {
key: 0..5,
lsn: 100..101,
is_image: true,
},
LayerKey {
key: 3..9,
lsn: 110..111,
is_image: true,
},
LayerKey {
key: 4..6,
lsn: 120..121,
is_image: true,
},
];
let layers = [
"Image 1".to_string(),
"Image 2".to_string(),
"Image 3".to_string(),
];
for (key, layer) in keys.iter().zip(layers.iter()) {
map.insert(key.to_owned(), layer.to_owned());
}
// rebuild is not necessary here, because replace works for both buffered updates and existing
// layers.
for (key, orig_layer) in keys.iter().zip(layers.iter()) {
let replacement = format!("Remote {orig_layer}");
// evict
let ret = map.replace(key, replacement.clone(), |l| l == orig_layer);
assert!(
matches!(ret, Replacement::Replaced { .. }),
"replace {orig_layer}: {ret:?}"
);
map.rebuild();
let at = key.lsn.end + 1;
let version = map.get().expect("rebuilt").get_version(at).unwrap();
assert_eq!(
version.image_coverage.query(4).as_deref(),
Some(replacement.as_str()),
"query for 4 at version {at} after eviction",
);
// download
let ret = map.replace(key, orig_layer.clone(), |l| l == &replacement);
assert!(
matches!(ret, Replacement::Replaced { .. }),
"replace {orig_layer} back: {ret:?}"
);
map.rebuild();
let version = map.get().expect("rebuilt").get_version(at).unwrap();
assert_eq!(
version.image_coverage.query(4).as_deref(),
Some(orig_layer.as_str()),
"query for 4 at version {at} after download",
);
}
}
#[test]
fn missing_key_is_not_inserted_with_replace() {
let mut map = BufferedHistoricLayerCoverage::new();
let key = LayerKey {
key: 0..5,
lsn: 100..101,
is_image: true,
};
let ret = map.replace(&key, "should not replace", |_| true);
assert!(matches!(ret, Replacement::NotFound), "{ret:?}");
map.rebuild();
assert!(map
.get()
.expect("no changes to rebuild")
.get_version(102)
.is_none());
}
#[test]
fn replacing_buffered_insert_and_remove() {
let mut map = BufferedHistoricLayerCoverage::new();
let key = LayerKey {
key: 0..5,
lsn: 100..101,
is_image: true,
};
map.insert(key.clone(), "Image 1");
let ret = map.replace(&key, "Remote Image 1", |&l| l == "Image 1");
assert!(
matches!(ret, Replacement::Replaced { in_buffered: true }),
"{ret:?}"
);
map.rebuild();
assert_eq!(
map.get()
.expect("rebuilt")
.get_version(102)
.unwrap()
.image_coverage
.query(4),
Some("Remote Image 1")
);
map.remove(key.clone());
let ret = map.replace(&key, "should not replace", |_| true);
assert!(
matches!(ret, Replacement::RemovalBuffered),
"cannot replace after scheduled remove: {ret:?}"
);
map.rebuild();
let ret = map.replace(&key, "should not replace", |_| true);
assert!(
matches!(ret, Replacement::NotFound),
"cannot replace after remove + rebuild: {ret:?}"
);
let at_version = map.get().expect("rebuilt").get_version(102);
assert!(at_version.is_none());
}

View File

@@ -101,24 +101,24 @@ impl<Value: Clone> LayerCoverage<Value> {
/// Get the latest (by lsn.end) layer at a given key
///
/// Complexity: O(log N)
pub fn query(&self, key: i128) -> Option<Value> {
pub fn query(&self, key: i128) -> Option<&Value> {
self.nodes
.range(..=key)
.rev()
.next()?
.1
.as_ref()
.map(|(_, v)| v.clone())
.map(|(_, v)| v)
}
/// Iterate the changes in layer coverage in a given range. You will likely
/// want to start with self.query(key.start), and then follow up with self.range
///
/// Complexity: O(log N + result_size)
pub fn range(&self, key: Range<i128>) -> impl '_ + Iterator<Item = (i128, Option<Value>)> {
pub fn range(&self, key: Range<i128>) -> impl '_ + Iterator<Item = (i128, Option<&Value>)> {
self.nodes
.range(key)
.map(|(k, v)| (*k, v.as_ref().map(|x| x.1.clone())))
.map(|(k, v)| (*k, v.as_ref().map(|x| &x.1)))
}
/// O(1) clone

View File

@@ -3394,11 +3394,10 @@ impl Timeline {
// Delta- or ImageLayer in the layer map.
let new_layer = remote_layer.create_downloaded_layer(self_clone.conf, *size);
let mut layers = self_clone.layers.write().unwrap();
let mut updates = layers.batch_update();
{
use crate::tenant::layer_map::Replacement;
let l: Arc<dyn PersistentLayer> = remote_layer.clone();
match updates.replace_historic(&l, new_layer) {
match layers.replace_historic(&l, new_layer) {
Ok(Replacement::Replaced { .. }) => { /* expected */ }
Ok(Replacement::NotFound) => {
// TODO: the downloaded file should probably be removed, otherwise
@@ -3432,7 +3431,6 @@ impl Timeline {
}
}
}
updates.flush();
drop(layers);
// Now that we've inserted the download into the layer map,