Compare commits

...

2 Commits

Author SHA1 Message Date
Bojan Serafimov
c471c25744 Clone less 2023-02-06 14:42:17 -05:00
Bojan Serafimov
e030830397 WIP 2023-02-06 13:55:53 -05:00
4 changed files with 72 additions and 277 deletions

View File

@@ -53,6 +53,7 @@ use crate::repository::Key;
use crate::tenant::storage_layer::InMemoryLayer; use crate::tenant::storage_layer::InMemoryLayer;
use crate::tenant::storage_layer::Layer; use crate::tenant::storage_layer::Layer;
use anyhow::Result; use anyhow::Result;
use std::collections::HashMap;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::ops::Range; use std::ops::Range;
use std::sync::Arc; use std::sync::Arc;
@@ -61,6 +62,8 @@ use utils::lsn::Lsn;
use historic_layer_coverage::BufferedHistoricLayerCoverage; use historic_layer_coverage::BufferedHistoricLayerCoverage;
pub use historic_layer_coverage::Replacement; pub use historic_layer_coverage::Replacement;
use self::historic_layer_coverage::LayerKey;
use super::storage_layer::range_eq; use super::storage_layer::range_eq;
/// ///
@@ -87,11 +90,18 @@ pub struct LayerMap<L: ?Sized> {
pub frozen_layers: VecDeque<Arc<InMemoryLayer>>, pub frozen_layers: VecDeque<Arc<InMemoryLayer>>,
/// Index of the historic layers optimized for search /// Index of the historic layers optimized for search
historic: BufferedHistoricLayerCoverage<Arc<L>>, historic: BufferedHistoricLayerCoverage<LayerKey>,
/// All layers accessible by key. Useful for:
/// 1. Iterating all layers
/// 2. Dereferencing a self.historic search result
/// 3. Replacing a layer with a remote/local version without
/// rebuilding the self.historic index.
mapping: HashMap<LayerKey, Arc<L>>,
/// L0 layers have key range Key::MIN..Key::MAX, and locating them using R-Tree search is very inefficient. /// L0 layers have key range Key::MIN..Key::MAX, and locating them using R-Tree search is very inefficient.
/// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree. /// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree.
l0_delta_layers: Vec<Arc<L>>, l0_delta_layers: HashMap<LayerKey, Arc<L>>,
} }
impl<L: ?Sized> Default for LayerMap<L> { impl<L: ?Sized> Default for LayerMap<L> {
@@ -100,8 +110,9 @@ impl<L: ?Sized> Default for LayerMap<L> {
open_layer: None, open_layer: None,
next_open_layer_at: None, next_open_layer_at: None,
frozen_layers: VecDeque::default(), frozen_layers: VecDeque::default(),
l0_delta_layers: Vec::default(), l0_delta_layers: HashMap::default(),
historic: BufferedHistoricLayerCoverage::default(), historic: BufferedHistoricLayerCoverage::default(),
mapping: HashMap::default(),
} }
} }
} }
@@ -139,24 +150,6 @@ where
self.layer_map.remove_historic_noflush(layer) self.layer_map.remove_historic_noflush(layer)
} }
/// Replaces existing layer iff it is the `expected`.
///
/// If the expected layer has been removed it will not be inserted by this function.
///
/// Returned `Replacement` describes succeeding in replacement or the reason why it could not
/// be done.
///
/// TODO replacement can be done without buffering and rebuilding layer map updates.
/// One way to do that is to add a layer of indirection for returned values, so
/// that we can replace values only by updating a hashmap.
pub fn replace_historic(
&mut self,
expected: &Arc<L>,
new: Arc<L>,
) -> anyhow::Result<Replacement<Arc<L>>> {
self.layer_map.replace_historic_noflush(expected, new)
}
// We will flush on drop anyway, but this method makes it // We will flush on drop anyway, but this method makes it
// more explicit that there is some work being done. // more explicit that there is some work being done.
/// Apply all updates /// Apply all updates
@@ -228,33 +221,38 @@ where
match (latest_delta, latest_image) { match (latest_delta, latest_image) {
(None, None) => None, (None, None) => None,
(None, Some(image)) => { (None, Some(image)) => {
let image = self.mapping.get(&image).unwrap();
let lsn_floor = image.get_lsn_range().start; let lsn_floor = image.get_lsn_range().start;
Some(SearchResult { Some(SearchResult {
layer: image, layer: image.clone(),
lsn_floor, lsn_floor,
}) })
} }
(Some(delta), None) => { (Some(delta), None) => {
let delta = self.mapping.get(&delta).unwrap();
let lsn_floor = delta.get_lsn_range().start; let lsn_floor = delta.get_lsn_range().start;
Some(SearchResult { Some(SearchResult {
layer: delta, layer: delta.clone(),
lsn_floor, lsn_floor,
}) })
} }
(Some(delta), Some(image)) => { (Some(delta), Some(image)) => {
let image = self.mapping.get(&image).unwrap();
let delta = self.mapping.get(&delta).unwrap();
let img_lsn = image.get_lsn_range().start; let img_lsn = image.get_lsn_range().start;
let image_is_newer = image.get_lsn_range().end >= delta.get_lsn_range().end; let image_is_newer = image.get_lsn_range().end >= delta.get_lsn_range().end;
let image_exact_match = img_lsn + 1 == end_lsn; let image_exact_match = img_lsn + 1 == end_lsn;
if image_is_newer || image_exact_match { if image_is_newer || image_exact_match {
Some(SearchResult { Some(SearchResult {
layer: image, layer: image.clone(),
lsn_floor: img_lsn, lsn_floor: img_lsn,
}) })
} else { } else {
let lsn_floor = let lsn_floor =
std::cmp::max(delta.get_lsn_range().start, image.get_lsn_range().start + 1); std::cmp::max(delta.get_lsn_range().start, image.get_lsn_range().start + 1);
Some(SearchResult { Some(SearchResult {
layer: delta, layer: delta.clone(),
lsn_floor, lsn_floor,
}) })
} }
@@ -273,13 +271,12 @@ where
/// Helper function for BatchedUpdates::insert_historic /// Helper function for BatchedUpdates::insert_historic
/// ///
pub(self) fn insert_historic_noflush(&mut self, layer: Arc<L>) { pub(self) fn insert_historic_noflush(&mut self, layer: Arc<L>) {
self.historic.insert( let key = LayerKey::from(&*layer);
historic_layer_coverage::LayerKey::from(&*layer), self.historic.insert(key.clone(), key.clone());
Arc::clone(&layer), self.mapping.insert(key.clone(), layer.clone());
);
if Self::is_l0(&layer) { if Self::is_l0(&layer) {
self.l0_delta_layers.push(layer); self.l0_delta_layers.insert(key, layer.clone());
} }
NUM_ONDISK_LAYERS.inc(); NUM_ONDISK_LAYERS.inc();
@@ -291,27 +288,28 @@ where
/// Helper function for BatchedUpdates::remove_historic /// Helper function for BatchedUpdates::remove_historic
/// ///
pub fn remove_historic_noflush(&mut self, layer: Arc<L>) { pub fn remove_historic_noflush(&mut self, layer: Arc<L>) {
self.historic let key = historic_layer_coverage::LayerKey::from(&*layer);
.remove(historic_layer_coverage::LayerKey::from(&*layer)); self.historic.remove(key.clone());
self.mapping.remove(&key.clone());
if Self::is_l0(&layer) { if Self::is_l0(&layer) {
let len_before = self.l0_delta_layers.len(); self.l0_delta_layers.remove(&key);
self.l0_delta_layers
.retain(|other| !Self::compare_arced_layers(other, &layer));
// this assertion is related to use of Arc::ptr_eq in Self::compare_arced_layers,
// there's a chance that the comparison fails at runtime due to it comparing (pointer,
// vtable) pairs.
assert_eq!(
self.l0_delta_layers.len(),
len_before - 1,
"failed to locate removed historic layer from l0_delta_layers"
);
} }
NUM_ONDISK_LAYERS.dec(); NUM_ONDISK_LAYERS.dec();
} }
pub(self) fn replace_historic_noflush( /// Replaces existing layer iff it is the `expected`.
///
/// If the expected layer has been removed it will not be inserted by this function.
///
/// Returned `Replacement` describes succeeding in replacement or the reason why it could not
/// be done.
///
/// TODO replacement can be done without buffering and rebuilding layer map updates.
/// One way to do that is to add a layer of indirection for returned values, so
/// that we can replace values only by updating a hashmap.
pub fn replace_historic(
&mut self, &mut self,
expected: &Arc<L>, expected: &Arc<L>,
new: Arc<L>, new: Arc<L>,
@@ -332,29 +330,23 @@ where
"expected and new must both be l0 deltas or neither should be: {expected_l0} != {new_l0}" "expected and new must both be l0 deltas or neither should be: {expected_l0} != {new_l0}"
); );
let l0_index = if expected_l0 { use std::collections::hash_map::Entry;
// find the index in case replace worked, we need to replace that as well
Some( if expected_l0 {
self.l0_delta_layers match self.mapping.entry(key.clone()) {
.iter() Entry::Occupied(mut entry) => entry.insert(new.clone()),
.position(|slot| Self::compare_arced_layers(slot, expected)) Entry::Vacant(_) => anyhow::bail!("layer doesn't exist"),
.ok_or_else(|| anyhow::anyhow!("existing l0 delta layer was not found"))?, };
)
} else {
None
}; };
let replaced = self.historic.replace(&key, new.clone(), |existing| { match self.mapping.entry(key.clone()) {
Self::compare_arced_layers(existing, expected) Entry::Occupied(mut entry) => entry.insert(new.clone()),
}); Entry::Vacant(_) => anyhow::bail!("layer doesn't exist"),
};
if let Replacement::Replaced { .. } = &replaced { Ok(Replacement::Replaced {
if let Some(index) = l0_index { in_buffered: false,
self.l0_delta_layers[index] = new; })
}
}
Ok(replaced)
} }
/// Helper function for BatchedUpdates::drop. /// Helper function for BatchedUpdates::drop.
@@ -382,8 +374,8 @@ where
let start = key.start.to_i128(); let start = key.start.to_i128();
let end = key.end.to_i128(); let end = key.end.to_i128();
let layer_covers = |layer: Option<Arc<L>>| match layer { let layer_covers = |key: Option<&LayerKey>| match key {
Some(layer) => layer.get_lsn_range().start >= lsn.start, Some(key) => self.mapping.get(key).unwrap().get_lsn_range().start >= lsn.start,
None => false, None => false,
}; };
@@ -403,7 +395,7 @@ where
} }
pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<L>> { pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<L>> {
self.historic.iter() self.mapping.values().cloned()
} }
/// ///
@@ -430,10 +422,13 @@ where
// Initialize loop variables // Initialize loop variables
let mut coverage: Vec<(Range<Key>, Option<Arc<L>>)> = vec![]; let mut coverage: Vec<(Range<Key>, Option<Arc<L>>)> = vec![];
let mut current_key = start; let mut current_key = start;
let mut current_val = version.image_coverage.query(start); let mut current_val = version.image_coverage.query(start)
.map(|key| self.mapping.get(&key).unwrap().clone());
// Loop through the change events and push intervals // Loop through the change events and push intervals
for (change_key, change_val) in version.image_coverage.range(start..end) { for (change_key, change_val) in version.image_coverage.range(start..end) {
let change_val = change_val.map(|key| self.mapping.get(&key).unwrap().clone());
let kr = Key::from_i128(current_key)..Key::from_i128(change_key); let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
coverage.push((kr, current_val.take())); coverage.push((kr, current_val.take()));
current_key = change_key; current_key = change_key;
@@ -527,6 +522,7 @@ where
for (change_key, change_val) in version.delta_coverage.range(start..end) { for (change_key, change_val) in version.delta_coverage.range(start..end) {
// If there's a relevant delta in this part, add 1 and recurse down // If there's a relevant delta in this part, add 1 and recurse down
if let Some(val) = current_val { if let Some(val) = current_val {
let val = self.mapping.get(&val).unwrap().clone();
if val.get_lsn_range().end > lsn.start { if val.get_lsn_range().end > lsn.start {
let kr = Key::from_i128(current_key)..Key::from_i128(change_key); let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
let lr = lsn.start..val.get_lsn_range().start; let lr = lsn.start..val.get_lsn_range().start;
@@ -549,6 +545,7 @@ where
// Consider the last part // Consider the last part
if let Some(val) = current_val { if let Some(val) = current_val {
let val = self.mapping.get(&val).unwrap().clone();
if val.get_lsn_range().end > lsn.start { if val.get_lsn_range().end > lsn.start {
let kr = Key::from_i128(current_key)..Key::from_i128(end); let kr = Key::from_i128(current_key)..Key::from_i128(end);
let lr = lsn.start..val.get_lsn_range().start; let lr = lsn.start..val.get_lsn_range().start;
@@ -705,7 +702,7 @@ where
/// Return all L0 delta layers /// Return all L0 delta layers
pub fn get_level0_deltas(&self) -> Result<Vec<Arc<L>>> { pub fn get_level0_deltas(&self) -> Result<Vec<Arc<L>>> {
Ok(self.l0_delta_layers.clone()) Ok(self.l0_delta_layers.values().cloned().collect())
} }
/// debugging function to print out the contents of the layer map /// debugging function to print out the contents of the layer map
@@ -730,18 +727,6 @@ where
println!("End dump LayerMap"); println!("End dump LayerMap");
Ok(()) Ok(())
} }
#[inline(always)]
fn compare_arced_layers(left: &Arc<L>, right: &Arc<L>) -> bool {
// FIXME: ptr_eq might fail to return true for 'dyn' references because of multiple vtables
// can be created in compilation. Clippy complains about this. In practice it seems to
// work.
//
// In future rust versions this might become Arc::as_ptr(left) as *const () ==
// Arc::as_ptr(right) as *const (), we could change to that before.
#[allow(clippy::vtable_address_comparisons)]
Arc::ptr_eq(left, right)
}
} }
#[cfg(test)] #[cfg(test)]
@@ -802,7 +787,6 @@ mod tests {
assert_eq!(count_layer_in(&map, &remote), expected_in_counts); assert_eq!(count_layer_in(&map, &remote), expected_in_counts);
let replaced = map let replaced = map
.batch_update()
.replace_historic(&remote, downloaded.clone()) .replace_historic(&remote, downloaded.clone())
.expect("name derived attributes are the same"); .expect("name derived attributes are the same");
assert!( assert!(

View File

@@ -12,7 +12,7 @@ use super::layer_coverage::LayerCoverageTuple;
/// These three values are enough to uniquely identify a layer, since /// These three values are enough to uniquely identify a layer, since
/// a layer is obligated to contain all contents within range, so two /// a layer is obligated to contain all contents within range, so two
/// deltas (or images) with the same range have identical content. /// deltas (or images) with the same range have identical content.
#[derive(Debug, PartialEq, Eq, Clone)] #[derive(Debug, PartialEq, Eq, Clone, Hash)]
pub struct LayerKey { pub struct LayerKey {
// TODO I use i128 and u64 because it was easy for prototyping, // TODO I use i128 and u64 because it was easy for prototyping,
// testing, and benchmarking. If we can use the Lsn and Key // testing, and benchmarking. If we can use the Lsn and Key
@@ -438,46 +438,6 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
/// ///
/// Returns a `Replacement` value describing the outcome; only the case of /// Returns a `Replacement` value describing the outcome; only the case of
/// `Replacement::Replaced` modifies the map and requires a rebuild. /// `Replacement::Replaced` modifies the map and requires a rebuild.
pub fn replace<F>(
&mut self,
layer_key: &LayerKey,
new: Value,
check_expected: F,
) -> Replacement<Value>
where
F: FnOnce(&Value) -> bool,
{
let (slot, in_buffered) = match self.buffer.get(layer_key) {
Some(inner @ Some(_)) => {
// we compare against the buffered version, because there will be a later
// rebuild before querying
(inner.as_ref(), true)
}
Some(None) => {
// buffer has removal for this key; it will not be equivalent by any check_expected.
return Replacement::RemovalBuffered;
}
None => {
// no pending modification for the key, check layers
(self.layers.get(layer_key), false)
}
};
match slot {
Some(existing) if !check_expected(existing) => {
// unfortunate clone here, but otherwise the nll borrowck grows the region of
// 'a to cover the whole function, and we could not mutate in the other
// Some(existing) branch
Replacement::Unexpected(existing.clone())
}
None => Replacement::NotFound,
Some(_existing) => {
self.insert(layer_key.to_owned(), new);
Replacement::Replaced { in_buffered }
}
}
}
pub fn rebuild(&mut self) { pub fn rebuild(&mut self) {
// Find the first LSN that needs to be rebuilt // Find the first LSN that needs to be rebuilt
let rebuild_since: u64 = match self.buffer.iter().next() { let rebuild_since: u64 = match self.buffer.iter().next() {
@@ -521,17 +481,6 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
) )
} }
/// Iterate all the layers
pub fn iter(&self) -> impl '_ + Iterator<Item = Value> {
// NOTE we can actually perform this without rebuilding,
// but it's not necessary for now.
if !self.buffer.is_empty() {
panic!("rebuild pls")
}
self.layers.values().cloned()
}
/// Return a reference to a queryable map, assuming all updates /// Return a reference to a queryable map, assuming all updates
/// have already been processed using self.rebuild() /// have already been processed using self.rebuild()
pub fn get(&self) -> anyhow::Result<&HistoricLayerCoverage<Value>> { pub fn get(&self) -> anyhow::Result<&HistoricLayerCoverage<Value>> {
@@ -670,139 +619,3 @@ fn test_retroactive_simple() {
assert_eq!(version.image_coverage.query(8), Some("Image 4".to_string())); assert_eq!(version.image_coverage.query(8), Some("Image 4".to_string()));
} }
} }
#[test]
fn test_retroactive_replacement() {
let mut map = BufferedHistoricLayerCoverage::new();
let keys = [
LayerKey {
key: 0..5,
lsn: 100..101,
is_image: true,
},
LayerKey {
key: 3..9,
lsn: 110..111,
is_image: true,
},
LayerKey {
key: 4..6,
lsn: 120..121,
is_image: true,
},
];
let layers = [
"Image 1".to_string(),
"Image 2".to_string(),
"Image 3".to_string(),
];
for (key, layer) in keys.iter().zip(layers.iter()) {
map.insert(key.to_owned(), layer.to_owned());
}
// rebuild is not necessary here, because replace works for both buffered updates and existing
// layers.
for (key, orig_layer) in keys.iter().zip(layers.iter()) {
let replacement = format!("Remote {orig_layer}");
// evict
let ret = map.replace(key, replacement.clone(), |l| l == orig_layer);
assert!(
matches!(ret, Replacement::Replaced { .. }),
"replace {orig_layer}: {ret:?}"
);
map.rebuild();
let at = key.lsn.end + 1;
let version = map.get().expect("rebuilt").get_version(at).unwrap();
assert_eq!(
version.image_coverage.query(4).as_deref(),
Some(replacement.as_str()),
"query for 4 at version {at} after eviction",
);
// download
let ret = map.replace(key, orig_layer.clone(), |l| l == &replacement);
assert!(
matches!(ret, Replacement::Replaced { .. }),
"replace {orig_layer} back: {ret:?}"
);
map.rebuild();
let version = map.get().expect("rebuilt").get_version(at).unwrap();
assert_eq!(
version.image_coverage.query(4).as_deref(),
Some(orig_layer.as_str()),
"query for 4 at version {at} after download",
);
}
}
#[test]
fn missing_key_is_not_inserted_with_replace() {
let mut map = BufferedHistoricLayerCoverage::new();
let key = LayerKey {
key: 0..5,
lsn: 100..101,
is_image: true,
};
let ret = map.replace(&key, "should not replace", |_| true);
assert!(matches!(ret, Replacement::NotFound), "{ret:?}");
map.rebuild();
assert!(map
.get()
.expect("no changes to rebuild")
.get_version(102)
.is_none());
}
#[test]
fn replacing_buffered_insert_and_remove() {
let mut map = BufferedHistoricLayerCoverage::new();
let key = LayerKey {
key: 0..5,
lsn: 100..101,
is_image: true,
};
map.insert(key.clone(), "Image 1");
let ret = map.replace(&key, "Remote Image 1", |&l| l == "Image 1");
assert!(
matches!(ret, Replacement::Replaced { in_buffered: true }),
"{ret:?}"
);
map.rebuild();
assert_eq!(
map.get()
.expect("rebuilt")
.get_version(102)
.unwrap()
.image_coverage
.query(4),
Some("Remote Image 1")
);
map.remove(key.clone());
let ret = map.replace(&key, "should not replace", |_| true);
assert!(
matches!(ret, Replacement::RemovalBuffered),
"cannot replace after scheduled remove: {ret:?}"
);
map.rebuild();
let ret = map.replace(&key, "should not replace", |_| true);
assert!(
matches!(ret, Replacement::NotFound),
"cannot replace after remove + rebuild: {ret:?}"
);
let at_version = map.get().expect("rebuilt").get_version(102);
assert!(at_version.is_none());
}

View File

@@ -101,24 +101,24 @@ impl<Value: Clone> LayerCoverage<Value> {
/// Get the latest (by lsn.end) layer at a given key /// Get the latest (by lsn.end) layer at a given key
/// ///
/// Complexity: O(log N) /// Complexity: O(log N)
pub fn query(&self, key: i128) -> Option<Value> { pub fn query(&self, key: i128) -> Option<&Value> {
self.nodes self.nodes
.range(..=key) .range(..=key)
.rev() .rev()
.next()? .next()?
.1 .1
.as_ref() .as_ref()
.map(|(_, v)| v.clone()) .map(|(_, v)| v)
} }
/// Iterate the changes in layer coverage in a given range. You will likely /// Iterate the changes in layer coverage in a given range. You will likely
/// want to start with self.query(key.start), and then follow up with self.range /// want to start with self.query(key.start), and then follow up with self.range
/// ///
/// Complexity: O(log N + result_size) /// Complexity: O(log N + result_size)
pub fn range(&self, key: Range<i128>) -> impl '_ + Iterator<Item = (i128, Option<Value>)> { pub fn range(&self, key: Range<i128>) -> impl '_ + Iterator<Item = (i128, Option<&Value>)> {
self.nodes self.nodes
.range(key) .range(key)
.map(|(k, v)| (*k, v.as_ref().map(|x| x.1.clone()))) .map(|(k, v)| (*k, v.as_ref().map(|x| &x.1)))
} }
/// O(1) clone /// O(1) clone

View File

@@ -3394,11 +3394,10 @@ impl Timeline {
// Delta- or ImageLayer in the layer map. // Delta- or ImageLayer in the layer map.
let new_layer = remote_layer.create_downloaded_layer(self_clone.conf, *size); let new_layer = remote_layer.create_downloaded_layer(self_clone.conf, *size);
let mut layers = self_clone.layers.write().unwrap(); let mut layers = self_clone.layers.write().unwrap();
let mut updates = layers.batch_update();
{ {
use crate::tenant::layer_map::Replacement; use crate::tenant::layer_map::Replacement;
let l: Arc<dyn PersistentLayer> = remote_layer.clone(); let l: Arc<dyn PersistentLayer> = remote_layer.clone();
match updates.replace_historic(&l, new_layer) { match layers.replace_historic(&l, new_layer) {
Ok(Replacement::Replaced { .. }) => { /* expected */ } Ok(Replacement::Replaced { .. }) => { /* expected */ }
Ok(Replacement::NotFound) => { Ok(Replacement::NotFound) => {
// TODO: the downloaded file should probably be removed, otherwise // TODO: the downloaded file should probably be removed, otherwise
@@ -3432,7 +3431,6 @@ impl Timeline {
} }
} }
} }
updates.flush();
drop(layers); drop(layers);
// Now that we've inserted the download into the layer map, // Now that we've inserted the download into the layer map,