Organize modules, rename structs

This commit is contained in:
Bojan Serafimov
2023-01-10 21:03:26 -05:00
parent d3d17f2c7c
commit cb5b9375d2
5 changed files with 73 additions and 72 deletions

View File

@@ -74,11 +74,8 @@ use utils::{
mod blob_io;
pub mod block_io;
pub mod bst_layer_map;
mod disk_btree;
pub(crate) mod ephemeral_file;
pub mod latest_layer_map;
pub mod layer_coverage;
pub mod layer_map;
pub mod metadata;

View File

@@ -1,28 +0,0 @@
use super::layer_coverage::LayerCoverage;
/// Separate coverage data structure for each layer type.
///
/// This is just a tuple but with the elements named to
/// prevent bugs.
pub struct LatestLayerMap<Value> {
pub image_coverage: LayerCoverage<Value>,
pub delta_coverage: LayerCoverage<Value>,
}
impl<T: Clone> Default for LatestLayerMap<T> {
fn default() -> Self {
Self {
image_coverage: LayerCoverage::default(),
delta_coverage: LayerCoverage::default(),
}
}
}
impl<Value: Clone> LatestLayerMap<Value> {
pub fn clone(self: &Self) -> Self {
Self {
image_coverage: self.image_coverage.clone(),
delta_coverage: self.delta_coverage.clone(),
}
}
}

View File

@@ -10,6 +10,9 @@
//! corresponding files are written to disk.
//!
mod historic_layer_coverage;
mod layer_coverage;
use crate::keyspace::KeyPartitioning;
use crate::metrics::NUM_ONDISK_LAYERS;
use crate::repository::Key;
@@ -21,7 +24,7 @@ use std::ops::Range;
use std::sync::Arc;
use utils::lsn::Lsn;
use super::bst_layer_map::RetroactiveLayerMap;
use historic_layer_coverage::BufferedHistoricLayerCoverage;
///
/// LayerMap tracks what layers exist on a timeline.
@@ -47,7 +50,7 @@ pub struct LayerMap<L: ?Sized> {
pub frozen_layers: VecDeque<Arc<InMemoryLayer>>,
/// Index of the historic layers optimized for search
index: RetroactiveLayerMap<Arc<L>>,
historic: BufferedHistoricLayerCoverage<Arc<L>>,
/// L0 layers have key range Key::MIN..Key::MAX, and locating them using R-Tree search is very inefficient.
/// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree.
@@ -61,7 +64,7 @@ impl<L: ?Sized> Default for LayerMap<L> {
next_open_layer_at: None,
frozen_layers: VecDeque::default(),
l0_delta_layers: Vec::default(),
index: RetroactiveLayerMap::default(),
historic: BufferedHistoricLayerCoverage::default(),
}
}
}
@@ -91,7 +94,7 @@ where
/// 'open' and 'frozen' layers!
///
pub fn search(&self, key: Key, end_lsn: Lsn) -> Option<SearchResult<L>> {
let version = self.index.get().unwrap().get_version(end_lsn.0 - 1)?;
let version = self.historic.get().unwrap().get_version(end_lsn.0 - 1)?;
let latest_delta = version.delta_coverage.query(key.to_i128());
let latest_image = version.image_coverage.query(key.to_i128());
@@ -138,7 +141,7 @@ where
pub fn insert_historic(&mut self, layer: Arc<L>) {
let kr = layer.get_key_range();
let lr = layer.get_lsn_range();
self.index.insert(
self.historic.insert(
kr.start.to_i128()..kr.end.to_i128(),
lr.start.0..lr.end.0,
Arc::clone(&layer),
@@ -154,7 +157,7 @@ where
/// Must be called after a batch of insert_historic calls, before querying
pub fn rebuild_index(&mut self) {
self.index.rebuild();
self.historic.rebuild();
}
///
@@ -165,7 +168,7 @@ where
pub fn remove_historic(&mut self, layer: Arc<L>) {
let kr = layer.get_key_range();
let lr = layer.get_lsn_range();
self.index.remove(
self.historic.remove(
kr.start.to_i128()..kr.end.to_i128(),
lr.start.0..lr.end.0,
!layer.is_incremental(),
@@ -196,7 +199,7 @@ where
return Ok(true);
}
let version = match self.index.get().unwrap().get_version(lsn.end.0) {
let version = match self.historic.get().unwrap().get_version(lsn.end.0) {
Some(v) => v,
None => return Ok(false),
};
@@ -225,7 +228,7 @@ where
}
pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<L>> {
self.index.iter()
self.historic.iter()
}
///
@@ -241,7 +244,7 @@ where
key_range: &Range<Key>,
lsn: Lsn,
) -> Result<Vec<(Range<Key>, Option<Arc<L>>)>> {
let version = match self.index.get().unwrap().get_version(lsn.0 - 1) {
let version = match self.historic.get().unwrap().get_version(lsn.0 - 1) {
Some(v) => v,
None => return Ok(vec![]),
};
@@ -283,7 +286,7 @@ where
return Ok(0);
}
let version = match self.index.get().unwrap().get_version(lsn.end.0 - 1) {
let version = match self.historic.get().unwrap().get_version(lsn.end.0 - 1) {
Some(v) => v,
None => return Ok(0),
};

View File

@@ -1,26 +1,31 @@
use std::collections::BTreeMap;
use std::ops::Range;
use super::latest_layer_map::LatestLayerMap;
use super::layer_coverage::LayerCoverageTuple;
pub struct PersistentLayerMap<Value> {
/// The latest-only solution
head: LatestLayerMap<Value>,
/// Efficiently queryable layer coverage for each LSN.
///
/// Allows answering layer map queries very efficiently,
/// but doesn't allow retroactive insertion, which is
/// sometimes necessary. See BufferedHistoricLayerCoverage.
pub struct HistoricLayerCoverage<Value> {
/// The latest state
head: LayerCoverageTuple<Value>,
/// All previous states
historic: BTreeMap<u64, LatestLayerMap<Value>>,
historic: BTreeMap<u64, LayerCoverageTuple<Value>>,
}
impl<T: Clone> Default for PersistentLayerMap<T> {
impl<T: Clone> Default for HistoricLayerCoverage<T> {
fn default() -> Self {
Self::new()
}
}
impl<Value: Clone> PersistentLayerMap<Value> {
impl<Value: Clone> HistoricLayerCoverage<Value> {
pub fn new() -> Self {
Self {
head: LatestLayerMap::default(),
head: LayerCoverageTuple::default(),
historic: BTreeMap::default(),
}
}
@@ -56,7 +61,7 @@ impl<Value: Clone> PersistentLayerMap<Value> {
self.historic.insert(lsn.start, self.head.clone());
}
pub fn get_version(self: &Self, lsn: u64) -> Option<&LatestLayerMap<Value>> {
pub fn get_version(self: &Self, lsn: u64) -> Option<&LayerCoverageTuple<Value>> {
match self.historic.range(..=lsn).rev().next() {
Some((_, v)) => Some(v),
None => None,
@@ -79,7 +84,7 @@ impl<Value: Clone> PersistentLayerMap<Value> {
/// All layers in this test have height 1.
#[test]
fn test_persistent_simple() {
let mut map = PersistentLayerMap::<String>::new();
let mut map = HistoricLayerCoverage::<String>::new();
map.insert(0..5, 100..101, "Layer 1".to_string(), true);
map.insert(3..9, 110..111, "Layer 2".to_string(), true);
map.insert(5..6, 120..121, "Layer 3".to_string(), true);
@@ -105,7 +110,7 @@ fn test_persistent_simple() {
/// Cover simple off-by-one edge cases
#[test]
fn test_off_by_one() {
let mut map = PersistentLayerMap::<String>::new();
let mut map = HistoricLayerCoverage::<String>::new();
map.insert(3..5, 100..110, "Layer 1".to_string(), true);
// Check different LSNs
@@ -127,7 +132,7 @@ fn test_off_by_one() {
/// Cover edge cases where layers begin or end on the same key
#[test]
fn test_key_collision() {
let mut map = PersistentLayerMap::<String>::new();
let mut map = HistoricLayerCoverage::<String>::new();
map.insert(3..5, 100..110, "Layer 10".to_string(), true);
map.insert(5..8, 100..110, "Layer 11".to_string(), true);
@@ -172,7 +177,7 @@ fn test_key_collision() {
/// Test when rectangles have nontrivial height and possibly overlap
#[test]
fn test_persistent_overlapping() {
let mut map = PersistentLayerMap::<String>::new();
let mut map = HistoricLayerCoverage::<String>::new();
// Add 3 key-disjoint layers with varying LSN ranges
map.insert(1..2, 100..200, "Layer 1".to_string(), true);
@@ -219,7 +224,7 @@ fn test_persistent_overlapping() {
assert_eq!(version.image_coverage.query(8), Some("Layer 6".to_string()));
}
/// Wrapper for PersistentLayerMap that allows us to hack around the lack
/// Wrapper for HistoricLayerCoverage that allows us to hack around the lack
/// of support for retroactive insertion by rebuilding the map since the
/// change.
///
@@ -238,9 +243,9 @@ fn test_persistent_overlapping() {
///
/// See this for more on persistent and retroactive techniques:
/// https://www.youtube.com/watch?v=WqCWghETNDc&t=581s
pub struct RetroactiveLayerMap<Value> {
pub struct BufferedHistoricLayerCoverage<Value> {
/// A persistent layer map that we rebuild when we need to retroactively update
map: PersistentLayerMap<Value>,
historic_coverage: HistoricLayerCoverage<Value>,
/// We buffer insertion into the PersistentLayerMap to decrease the number of rebuilds.
///
@@ -254,7 +259,7 @@ pub struct RetroactiveLayerMap<Value> {
layers: BTreeMap<(u64, u64, i128, i128, bool), Value>,
}
impl<T: std::fmt::Debug> std::fmt::Debug for RetroactiveLayerMap<T> {
impl<T: std::fmt::Debug> std::fmt::Debug for BufferedHistoricLayerCoverage<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RetroactiveLayerMap")
.field("buffer", &self.buffer)
@@ -263,16 +268,16 @@ impl<T: std::fmt::Debug> std::fmt::Debug for RetroactiveLayerMap<T> {
}
}
impl<T: Clone> Default for RetroactiveLayerMap<T> {
impl<T: Clone> Default for BufferedHistoricLayerCoverage<T> {
fn default() -> Self {
Self::new()
}
}
impl<Value: Clone> RetroactiveLayerMap<Value> {
impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
pub fn new() -> Self {
Self {
map: PersistentLayerMap::<Value>::new(),
historic_coverage: HistoricLayerCoverage::<Value>::new(),
buffer: BTreeMap::new(),
layers: BTreeMap::new(),
}
@@ -324,11 +329,11 @@ impl<Value: Clone> RetroactiveLayerMap<Value> {
});
// Rebuild
self.map.trim(&rebuild_since);
self.historic_coverage.trim(&rebuild_since);
for ((lsn_start, lsn_end, key_start, key_end, is_image), layer) in
self.layers.range((rebuild_since, 0, 0, 0, false)..)
{
self.map.insert(
self.historic_coverage.insert(
*key_start..*key_end,
*lsn_start..*lsn_end,
layer.clone(),
@@ -337,10 +342,6 @@ impl<Value: Clone> RetroactiveLayerMap<Value> {
}
}
pub fn clear(self: &mut Self) {
self.map.trim(&0);
}
/// Iterate all the layers
pub fn iter(self: &Self) -> impl '_ + Iterator<Item = Value> {
// NOTE we can actually perform this without rebuilding,
@@ -354,7 +355,7 @@ impl<Value: Clone> RetroactiveLayerMap<Value> {
/// Return a reference to a queryable map, assuming all updates
/// have already been processed using self.rebuild()
pub fn get(self: &Self) -> anyhow::Result<&PersistentLayerMap<Value>> {
pub fn get(self: &Self) -> anyhow::Result<&HistoricLayerCoverage<Value>> {
// NOTE we error here instead of implicitly rebuilding because
// rebuilding is somewhat expensive.
// TODO maybe implicitly rebuild and log/sentry an error?
@@ -362,13 +363,13 @@ impl<Value: Clone> RetroactiveLayerMap<Value> {
anyhow::bail!("rebuild required")
}
Ok(&self.map)
Ok(&self.historic_coverage)
}
}
#[test]
fn test_retroactive_regression_1() {
let mut map = RetroactiveLayerMap::new();
let mut map = BufferedHistoricLayerCoverage::new();
map.insert(
0..21267647932558653966460912964485513215,
@@ -388,7 +389,7 @@ fn test_retroactive_regression_1() {
#[test]
fn test_retroactive_simple() {
let mut map = RetroactiveLayerMap::new();
let mut map = BufferedHistoricLayerCoverage::new();
// Append some images in increasing LSN order
map.insert(0..5, 100..101, "Image 1".to_string(), true);

View File

@@ -16,6 +16,10 @@ pub struct LayerCoverage<Value> {
/// For every change in coverage (as we sweep the key space)
/// we store (lsn.end, value).
///
/// We use an immutable/persistent tree so that we can keep historic
/// versions of this coverage without cloning the whole thing and
/// incurring quadratic memory cost. See HistoricLayerCoverage.
///
/// We use the Sync version of the map because we want Self to
/// be Sync. Using nonsync might be faster, if we can work with
/// that.
@@ -132,3 +136,27 @@ impl<Value: Clone> LayerCoverage<Value> {
}
}
}
/// Image and delta coverage at a specific LSN.
pub struct LayerCoverageTuple<Value> {
pub image_coverage: LayerCoverage<Value>,
pub delta_coverage: LayerCoverage<Value>,
}
impl<T: Clone> Default for LayerCoverageTuple<T> {
fn default() -> Self {
Self {
image_coverage: LayerCoverage::default(),
delta_coverage: LayerCoverage::default(),
}
}
}
impl<Value: Clone> LayerCoverageTuple<Value> {
pub fn clone(self: &Self) -> Self {
Self {
image_coverage: self.image_coverage.clone(),
delta_coverage: self.delta_coverage.clone(),
}
}
}